package com.ruoyi.device.websocket; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.ruoyi.common.core.domain.model.LoginUser; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.device.domain.entity.TsbDevice; import com.ruoyi.device.domain.model.TsbUserDeviceBind; import com.ruoyi.device.mqtt.domain.BaseJsonBody; import com.ruoyi.device.mqtt.enums.CmdTypeEnum; import com.ruoyi.device.mqtt.enums.MsgTypeEnum; import com.ruoyi.device.mqtt.handler.DeviceOnlineManager; import com.ruoyi.device.mqtt.handler.HandlerManager; import com.ruoyi.device.mqtt.handler.JsonCmdHandlerManager; import com.ruoyi.device.mqtt.handler.encoder.IEncoder; import com.ruoyi.device.mqtt.handler.encoder.json.IJsonCmdDownHandler; import com.ruoyi.device.mqtt.util.MsgHandlerUtil; import com.ruoyi.device.service.ITsbDeviceService; import com.ruoyi.device.websocket.model.TsbWebSocketMessage; import com.ruoyi.framework.web.service.TokenService; import jakarta.annotation.Resource; import jakarta.websocket.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.authentication.UsernamePasswordAuthenticationToken; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service; import java.util.Map; import java.util.Set; /** * 调试宝 WebSocket 业务:鉴权、会话注册、页面数据转发 * * @author lwm */ @Service public class TsbWebSocketService { private static final Logger log = LoggerFactory.getLogger(TsbWebSocketService.class); @Resource private TokenService tokenService; @Resource private ITsbDeviceService tsbDeviceService; @Resource private HandlerManager handlerManager; @Resource private JsonCmdHandlerManager jsonCmdHandlerManager; @Resource private DeviceOnlineManager deviceOnlineManager; /** * WebSocket 连接预检:设备在线、操作权限、设备未被占用 * * @return 失败原因;通过返回 null */ public String precheckConnect(Long userId, Long deviceSn, LoginUser loginUser) { if (deviceSn == null) { return "请指定设备编号"; } if (userId == null || loginUser == null || loginUser.getUser() == null) { return "登录状态无效,请重新登录"; } TsbDevice device = tsbDeviceService.selectAccessibleDeviceBySn(deviceSn); if (device == null) { return "您没有该设备的操作权限"; } if (StringUtils.isEmpty(device.getDeviceType())) { return "设备类型未配置,无法建立连接"; } if (!deviceOnlineManager.isOnline(deviceSn)) { return "设备未登录在线,无法建立连接"; } if (TsbWebSocketUsers.isDeviceConnected(deviceSn)) { return "该设备已被占用连接,请稍后再试"; } return null; } /** * 构建 Web 端操作设备所需的绑定信息 */ public TsbUserDeviceBind buildOperateBind(Long deviceSn, LoginUser loginUser) { TsbDevice device = tsbDeviceService.selectAccessibleDeviceBySn(deviceSn); TsbUserDeviceBind bind = new TsbUserDeviceBind(); bind.setDeviceId(device.getDeviceId()); bind.setDeviceType(device.getDeviceType()); bind.setDeviceSn(device.getDeviceSn()); bind.setImei(device.getImei()); bind.setUserId(loginUser.getUserId()); bind.setUserName(loginUser.getUsername()); bind.setNickName(loginUser.getUser().getNickName()); return bind; } /** * 握手鉴权:解析 token、deviceSn,预检后写入 session 属性 * * @return 绑定信息;失败返回 null(failReason 写入原因) */ public JSONObject authenticate(String token, Long deviceSn, Session session, StringBuilder failReason) { LoginUser loginUser = tokenService.getLoginUser(token); if (loginUser == null || loginUser.getUser() == null) { log.warn("WebSocket 鉴权失败:无效 token"); if (failReason != null) { failReason.append("登录状态无效,请重新登录"); } return null; } // 手动设置 Spring Security 上下文,使 SecurityUtils.getLoginUser() 可用 UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken( loginUser, null, loginUser.getAuthorities()); SecurityContextHolder.getContext().setAuthentication(authentication); try { Long userId = loginUser.getUserId(); String precheckMsg = precheckConnect(userId, deviceSn, loginUser); if (precheckMsg != null) { log.warn("WebSocket 预检失败:{}, userId={}, deviceSn={}", precheckMsg, userId, deviceSn); if (failReason != null) { failReason.append(precheckMsg); } return null; } TsbUserDeviceBind bind = buildOperateBind(deviceSn, loginUser); // session中写入用户设备信息 便于后续获取 session.getUserProperties().put(TsbWebSocketConstants.DEVICE_SN_KEY, bind.getDeviceSn()); session.getUserProperties().put(TsbWebSocketConstants.USER_DEVICE_BIND_KEY, bind); session.getUserProperties().put(TsbWebSocketConstants.USER_PERMISSIONS_KEY, loginUser.getPermissions()); // 创建信息map if (!TsbWebSocketUsers.tryPut(bind.getDeviceSn(), session)) { log.warn("WebSocket 注册失败:设备已被占用, deviceSn={}", bind.getDeviceSn()); if (failReason != null) { failReason.append("该设备已被占用连接,请稍后再试"); } return null; } log.info("WebSocket 鉴权成功, userId={}, deviceSn={}", userId, bind.getDeviceSn()); return JSONObject.from(bind); } finally { // 清理 SecurityContext,避免线程复用导致的安全问题 SecurityContextHolder.clearContext(); } } /** * 处理 Web端 -> 后端消息 -> 设备 MQTT */ public void pushDeviceSyncFromPage(Session session, String message) { if (StringUtils.isEmpty(message)) { return; } // 1、解析消息 TsbWebSocketMessage tsbWebSocketMessage = JSON.parseObject(message, TsbWebSocketMessage.class); if (tsbWebSocketMessage == null || StringUtils.isEmpty(tsbWebSocketMessage.getCmdType())) { log.warn("WebSocket 消息格式无效, sessionId={}, message={}", session.getId(), message); return; } // 2、组装下发所需的消息 TsbUserDeviceBind bind = getBindFromSession(session); if (bind == null || bind.getDeviceSn() == null || StringUtils.isEmpty(bind.getDeviceType())) { log.warn("WebSocket 获取用户设备信息失败, sessionId={}, bind={}", session.getId(), bind); TsbWebSocketUsers.sendMessageToUserByText(session, TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "会话缺少设备绑定信息,请重新连接")); return; } Set userPermissions = getUserPermissionsFromSession(session); if (StringUtils.isEmpty(userPermissions) || !userPermissions.contains(tsbWebSocketMessage.getCmdType())) { log.warn("WebSocket 用户权限不足, sessionId={}, cmdType={}", session.getId(), tsbWebSocketMessage.getCmdType()); TsbWebSocketUsers.sendMessageToUserByText(session, TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "用户权限不足")); return; } CmdTypeEnum cmdType = CmdTypeEnum.resolveDownlink(tsbWebSocketMessage.getCmdType()); if (cmdType == null) { log.warn("WebSocket 命令类型无效, sessionId={}, cmdType={}", session.getId(), tsbWebSocketMessage.getCmdType()); TsbWebSocketUsers.sendMessageToUserByText(session, TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "命令类型无效")); return; } if (!deviceOnlineManager.isOnline(bind.getDeviceSn())) { log.warn("WebSocket 设备未在线, sessionId={}, deviceSn={}", session.getId(), bind.getDeviceSn()); TsbWebSocketUsers.sendMessageToUserByText(session, TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "设备未在线")); return; } // 3、Web data → MQTT 下行体(保留 JSONObject 全部字段) IJsonCmdDownHandler handler = jsonCmdHandlerManager.getDownHandlerByDownKey(cmdType.getCmdDownType()); if (handler == null) { handler = jsonCmdHandlerManager.getDownHandlerByBaseKey(cmdType.getCmdType()); } if (handler == null) { log.warn("未注册 JSON 命令处理器 cmd={}", cmdType); TsbWebSocketUsers.sendMessageToUserByText(session, TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "未实现的服务处理器")); return; } BaseJsonBody baseJsonBody = handler.handle(bind, tsbWebSocketMessage.getData(), cmdType); log.info("Web端 -> 设备 MQTT 同步消息, userId={}, deviceSn={}", bind.getUserId(), bind.getDeviceSn()); String key = MsgHandlerUtil.getEncoderKey(MsgTypeEnum.JSON_BODY); IEncoder encoder = (IEncoder) handlerManager.getEncoder(key); if (encoder == null) { log.warn("未找到 JSON Body 编码器, deviceSn={}", bind.getDeviceSn()); return; } log.info("MQTT 下行发送, cmdType={}, deviceType={}, deviceSn={}", cmdType.getCmdType(), bind.getDeviceType(), bind.getDeviceSn()); encoder.encode(baseJsonBody); } /** * 从 Session 中安全获取绑定信息 */ public TsbUserDeviceBind getBindFromSession(Session session) { Object bindObj = session.getUserProperties().get(TsbWebSocketConstants.USER_DEVICE_BIND_KEY); return bindObj instanceof TsbUserDeviceBind ? (TsbUserDeviceBind) bindObj : null; } /** * 从 Session 中安全获取用户权限集合 */ @SuppressWarnings("unchecked") public Set getUserPermissionsFromSession(Session session) { Object permissionsObj = session.getUserProperties().get(TsbWebSocketConstants.USER_PERMISSIONS_KEY); return permissionsObj instanceof Set ? (Set) permissionsObj : null; } /** * 从 Session 中安全获取 deviceSn */ public Long getDeviceSnFromSession(Session session) { Object deviceSnObj = session.getUserProperties().get(TsbWebSocketConstants.DEVICE_SN_KEY); return deviceSnObj instanceof Long ? (Long) deviceSnObj : null; } /** * 设备 MQTT 上行解析后推送到 Web端 */ public void pushPageSyncFromDevice(String pageKey, JSONObject bodyObj) { // 1、通过 deviceSn 判断设备是否在线 Long deviceSn = bodyObj.getLong("deviceSn"); if (deviceSn == null) { log.warn("MQTT上行设备SN码无效"); return; } if (!deviceOnlineManager.isOnline(deviceSn)) { log.warn("MQTT上行设备未在线, deviceSn={}", deviceSn); return; } // 2、通过 deviceSn 判断WebSocket是否连接 Map sessionUsers = TsbWebSocketUsers.getSessionUsers(); if (!sessionUsers.containsKey(deviceSn)) { log.warn("web端用户未登录, deviceSn={}", deviceSn); return; } // 3、发送消息 TsbWebSocketMessage msg = TsbWebSocketMessage.success(pageKey, "操作成功", bodyObj); TsbWebSocketUsers.sendMessageToUserByText(sessionUsers.get(deviceSn), msg); } }