TsbWebSocketService.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package com.ruoyi.device.websocket;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import com.ruoyi.common.core.domain.model.LoginUser;
  5. import com.ruoyi.common.utils.StringUtils;
  6. import com.ruoyi.device.domain.entity.TsbDevice;
  7. import com.ruoyi.device.domain.model.TsbUserDeviceBind;
  8. import com.ruoyi.device.mqtt.domain.BaseJsonBody;
  9. import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
  10. import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
  11. import com.ruoyi.device.mqtt.handler.DeviceOnlineManager;
  12. import com.ruoyi.device.mqtt.handler.HandlerManager;
  13. import com.ruoyi.device.mqtt.handler.JsonCmdHandlerManager;
  14. import com.ruoyi.device.mqtt.handler.encoder.IEncoder;
  15. import com.ruoyi.device.mqtt.handler.encoder.json.IJsonCmdDownHandler;
  16. import com.ruoyi.device.mqtt.util.MsgHandlerUtil;
  17. import com.ruoyi.device.service.ITsbDeviceService;
  18. import com.ruoyi.device.websocket.model.TsbWebSocketMessage;
  19. import com.ruoyi.framework.web.service.TokenService;
  20. import jakarta.annotation.Resource;
  21. import jakarta.websocket.Session;
  22. import org.slf4j.Logger;
  23. import org.slf4j.LoggerFactory;
  24. import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
  25. import org.springframework.security.core.context.SecurityContextHolder;
  26. import org.springframework.stereotype.Service;
  27. import java.util.Map;
  28. import java.util.Set;
  29. /**
  30. * 调试宝 WebSocket 业务:鉴权、会话注册、页面数据转发
  31. *
  32. * @author lwm
  33. */
  34. @Service
  35. public class TsbWebSocketService
  36. {
  37. private static final Logger log = LoggerFactory.getLogger(TsbWebSocketService.class);
  38. @Resource
  39. private TokenService tokenService;
  40. @Resource
  41. private ITsbDeviceService tsbDeviceService;
  42. @Resource
  43. private HandlerManager handlerManager;
  44. @Resource
  45. private JsonCmdHandlerManager jsonCmdHandlerManager;
  46. @Resource
  47. private DeviceOnlineManager deviceOnlineManager;
  48. /**
  49. * WebSocket 连接预检:设备在线、操作权限、设备未被占用
  50. *
  51. * @return 失败原因;通过返回 null
  52. */
  53. public String precheckConnect(Long userId, Long deviceSn, LoginUser loginUser)
  54. {
  55. if (deviceSn == null)
  56. {
  57. return "请指定设备编号";
  58. }
  59. if (userId == null || loginUser == null || loginUser.getUser() == null)
  60. {
  61. return "登录状态无效,请重新登录";
  62. }
  63. TsbDevice device = tsbDeviceService.selectAccessibleDeviceBySn(deviceSn);
  64. if (device == null)
  65. {
  66. return "您没有该设备的操作权限";
  67. }
  68. if (StringUtils.isEmpty(device.getDeviceType()))
  69. {
  70. return "设备类型未配置,无法建立连接";
  71. }
  72. if (!deviceOnlineManager.isOnline(deviceSn))
  73. {
  74. return "设备未登录在线,无法建立连接";
  75. }
  76. if (TsbWebSocketUsers.isDeviceConnected(deviceSn))
  77. {
  78. return "该设备已被占用连接,请稍后再试";
  79. }
  80. return null;
  81. }
  82. /**
  83. * 构建 Web 端操作设备所需的绑定信息
  84. */
  85. public TsbUserDeviceBind buildOperateBind(Long deviceSn, LoginUser loginUser)
  86. {
  87. TsbDevice device = tsbDeviceService.selectAccessibleDeviceBySn(deviceSn);
  88. TsbUserDeviceBind bind = new TsbUserDeviceBind();
  89. bind.setDeviceId(device.getDeviceId());
  90. bind.setDeviceType(device.getDeviceType());
  91. bind.setDeviceSn(device.getDeviceSn());
  92. bind.setImei(device.getImei());
  93. bind.setUserId(loginUser.getUserId());
  94. bind.setUserName(loginUser.getUsername());
  95. bind.setNickName(loginUser.getUser().getNickName());
  96. return bind;
  97. }
  98. /**
  99. * 握手鉴权:解析 token、deviceSn,预检后写入 session 属性
  100. *
  101. * @return 绑定信息;失败返回 null(failReason 写入原因)
  102. */
  103. public JSONObject authenticate(String token, Long deviceSn, Session session, StringBuilder failReason)
  104. {
  105. LoginUser loginUser = tokenService.getLoginUser(token);
  106. if (loginUser == null || loginUser.getUser() == null)
  107. {
  108. log.warn("WebSocket 鉴权失败:无效 token");
  109. if (failReason != null)
  110. {
  111. failReason.append("登录状态无效,请重新登录");
  112. }
  113. return null;
  114. }
  115. // 手动设置 Spring Security 上下文,使 SecurityUtils.getLoginUser() 可用
  116. UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(
  117. loginUser, null, loginUser.getAuthorities());
  118. SecurityContextHolder.getContext().setAuthentication(authentication);
  119. try
  120. {
  121. Long userId = loginUser.getUserId();
  122. String precheckMsg = precheckConnect(userId, deviceSn, loginUser);
  123. if (precheckMsg != null)
  124. {
  125. log.warn("WebSocket 预检失败:{}, userId={}, deviceSn={}", precheckMsg, userId, deviceSn);
  126. if (failReason != null)
  127. {
  128. failReason.append(precheckMsg);
  129. }
  130. return null;
  131. }
  132. TsbUserDeviceBind bind = buildOperateBind(deviceSn, loginUser);
  133. // session中写入用户设备信息 便于后续获取
  134. session.getUserProperties().put(TsbWebSocketConstants.DEVICE_SN_KEY, bind.getDeviceSn());
  135. session.getUserProperties().put(TsbWebSocketConstants.USER_DEVICE_BIND_KEY, bind);
  136. session.getUserProperties().put(TsbWebSocketConstants.USER_PERMISSIONS_KEY, loginUser.getPermissions());
  137. // 创建信息map
  138. if (!TsbWebSocketUsers.tryPut(bind.getDeviceSn(), session))
  139. {
  140. log.warn("WebSocket 注册失败:设备已被占用, deviceSn={}", bind.getDeviceSn());
  141. if (failReason != null)
  142. {
  143. failReason.append("该设备已被占用连接,请稍后再试");
  144. }
  145. return null;
  146. }
  147. log.info("WebSocket 鉴权成功, userId={}, deviceSn={}", userId, bind.getDeviceSn());
  148. return JSONObject.from(bind);
  149. }
  150. finally
  151. {
  152. // 清理 SecurityContext,避免线程复用导致的安全问题
  153. SecurityContextHolder.clearContext();
  154. }
  155. }
  156. /**
  157. * 处理 Web端 -> 后端消息 -> 设备 MQTT
  158. */
  159. public void pushDeviceSyncFromPage(Session session, String message)
  160. {
  161. if (StringUtils.isEmpty(message))
  162. {
  163. return;
  164. }
  165. // 1、解析消息
  166. TsbWebSocketMessage tsbWebSocketMessage = JSON.parseObject(message, TsbWebSocketMessage.class);
  167. if (tsbWebSocketMessage == null || StringUtils.isEmpty(tsbWebSocketMessage.getCmdType()))
  168. {
  169. log.warn("WebSocket 消息格式无效, sessionId={}, message={}", session.getId(), message);
  170. return;
  171. }
  172. // 2、组装下发所需的消息
  173. TsbUserDeviceBind bind = getBindFromSession(session);
  174. if (bind == null || bind.getDeviceSn() == null || StringUtils.isEmpty(bind.getDeviceType()))
  175. {
  176. log.warn("WebSocket 获取用户设备信息失败, sessionId={}, bind={}", session.getId(), bind);
  177. TsbWebSocketUsers.sendMessageToUserByText(session,
  178. TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "会话缺少设备绑定信息,请重新连接"));
  179. return;
  180. }
  181. Set<String> userPermissions = getUserPermissionsFromSession(session);
  182. if (StringUtils.isEmpty(userPermissions) || !userPermissions.contains(tsbWebSocketMessage.getCmdType()))
  183. {
  184. log.warn("WebSocket 用户权限不足, sessionId={}, cmdType={}", session.getId(), tsbWebSocketMessage.getCmdType());
  185. TsbWebSocketUsers.sendMessageToUserByText(session,
  186. TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "用户权限不足"));
  187. return;
  188. }
  189. CmdTypeEnum cmdType = CmdTypeEnum.resolveDownlink(tsbWebSocketMessage.getCmdType());
  190. if (cmdType == null)
  191. {
  192. log.warn("WebSocket 命令类型无效, sessionId={}, cmdType={}", session.getId(), tsbWebSocketMessage.getCmdType());
  193. TsbWebSocketUsers.sendMessageToUserByText(session,
  194. TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "命令类型无效"));
  195. return;
  196. }
  197. if (!deviceOnlineManager.isOnline(bind.getDeviceSn()))
  198. {
  199. log.warn("WebSocket 设备未在线, sessionId={}, deviceSn={}", session.getId(), bind.getDeviceSn());
  200. TsbWebSocketUsers.sendMessageToUserByText(session,
  201. TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "设备未在线"));
  202. return;
  203. }
  204. // 3、Web data → MQTT 下行体(保留 JSONObject 全部字段)
  205. IJsonCmdDownHandler handler = jsonCmdHandlerManager.getDownHandlerByDownKey(cmdType.getCmdDownType());
  206. if (handler == null)
  207. {
  208. handler = jsonCmdHandlerManager.getDownHandlerByBaseKey(cmdType.getCmdType());
  209. }
  210. if (handler == null)
  211. {
  212. log.warn("未注册 JSON 命令处理器 cmd={}", cmdType);
  213. TsbWebSocketUsers.sendMessageToUserByText(session,
  214. TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "未实现的服务处理器"));
  215. return;
  216. }
  217. BaseJsonBody baseJsonBody = handler.handle(bind, tsbWebSocketMessage.getData(), cmdType);
  218. log.info("Web端 -> 设备 MQTT 同步消息, userId={}, deviceSn={}", bind.getUserId(), bind.getDeviceSn());
  219. String key = MsgHandlerUtil.getEncoderKey(MsgTypeEnum.JSON_BODY);
  220. IEncoder<BaseJsonBody> encoder = (IEncoder<BaseJsonBody>) handlerManager.getEncoder(key);
  221. if (encoder == null)
  222. {
  223. log.warn("未找到 JSON Body 编码器, deviceSn={}", bind.getDeviceSn());
  224. return;
  225. }
  226. log.info("MQTT 下行发送, cmdType={}, deviceType={}, deviceSn={}",
  227. cmdType.getCmdType(), bind.getDeviceType(), bind.getDeviceSn());
  228. encoder.encode(baseJsonBody);
  229. }
  230. /**
  231. * 从 Session 中安全获取绑定信息
  232. */
  233. public TsbUserDeviceBind getBindFromSession(Session session)
  234. {
  235. Object bindObj = session.getUserProperties().get(TsbWebSocketConstants.USER_DEVICE_BIND_KEY);
  236. return bindObj instanceof TsbUserDeviceBind ? (TsbUserDeviceBind) bindObj : null;
  237. }
  238. /**
  239. * 从 Session 中安全获取用户权限集合
  240. */
  241. @SuppressWarnings("unchecked")
  242. public Set<String> getUserPermissionsFromSession(Session session)
  243. {
  244. Object permissionsObj = session.getUserProperties().get(TsbWebSocketConstants.USER_PERMISSIONS_KEY);
  245. return permissionsObj instanceof Set ? (Set<String>) permissionsObj : null;
  246. }
  247. /**
  248. * 从 Session 中安全获取 deviceSn
  249. */
  250. public Long getDeviceSnFromSession(Session session)
  251. {
  252. Object deviceSnObj = session.getUserProperties().get(TsbWebSocketConstants.DEVICE_SN_KEY);
  253. return deviceSnObj instanceof Long ? (Long) deviceSnObj : null;
  254. }
  255. /**
  256. * 设备 MQTT 上行解析后推送到 Web端
  257. */
  258. public void pushPageSyncFromDevice(String pageKey, JSONObject bodyObj)
  259. {
  260. // 1、通过 deviceSn 判断设备是否在线
  261. Long deviceSn = bodyObj.getLong("deviceSn");
  262. if (deviceSn == null)
  263. {
  264. log.warn("MQTT上行设备SN码无效");
  265. return;
  266. }
  267. if (!deviceOnlineManager.isOnline(deviceSn))
  268. {
  269. log.warn("MQTT上行设备未在线, deviceSn={}", deviceSn);
  270. return;
  271. }
  272. // 2、通过 deviceSn 判断WebSocket是否连接
  273. Map<Long, Session> sessionUsers = TsbWebSocketUsers.getSessionUsers();
  274. if (!sessionUsers.containsKey(deviceSn))
  275. {
  276. log.warn("web端用户未登录, deviceSn={}", deviceSn);
  277. return;
  278. }
  279. // 3、发送消息
  280. TsbWebSocketMessage msg = TsbWebSocketMessage.success(pageKey, "操作成功", bodyObj);
  281. TsbWebSocketUsers.sendMessageToUserByText(sessionUsers.get(deviceSn), msg);
  282. }
  283. }