|
|
@@ -0,0 +1,200 @@
|
|
|
+package com.ruoyi.device.websocket;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.ruoyi.common.core.domain.model.LoginUser;
|
|
|
+import com.ruoyi.common.utils.StringUtils;
|
|
|
+import com.ruoyi.device.domain.model.TsbUserDeviceBind;
|
|
|
+import com.ruoyi.device.mapper.TsbUserDeviceMapper;
|
|
|
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
|
|
|
+import com.ruoyi.device.mqtt.domain.encoder.TaxDataDown;
|
|
|
+import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
|
|
|
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
|
|
|
+import com.ruoyi.device.mqtt.handler.DeviceOnlineManager;
|
|
|
+import com.ruoyi.device.mqtt.handler.HandlerManager;
|
|
|
+import com.ruoyi.device.mqtt.handler.encoder.IEncoder;
|
|
|
+import com.ruoyi.device.mqtt.util.MsgHandlerUtil;
|
|
|
+import com.ruoyi.device.websocket.model.TsbWebSocketMessage;
|
|
|
+import com.ruoyi.framework.web.service.TokenService;
|
|
|
+import jakarta.annotation.Resource;
|
|
|
+import jakarta.websocket.Session;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 调试宝 WebSocket 业务:鉴权、会话注册、页面数据转发
|
|
|
+ *
|
|
|
+ * @author lwm
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class TsbWebSocketService
|
|
|
+{
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(TsbWebSocketService.class);
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private TokenService tokenService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private TsbUserDeviceMapper tsbUserDeviceMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private HandlerManager handlerManager;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private DeviceOnlineManager deviceOnlineManager;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 握手鉴权:解析 token、校验用户已绑定设备,写入 session 属性
|
|
|
+ *
|
|
|
+ * @return 绑定信息;失败返回 null
|
|
|
+ */
|
|
|
+ public JSONObject authenticate(String token, Session session)
|
|
|
+ {
|
|
|
+ LoginUser loginUser = tokenService.getLoginUser(token);
|
|
|
+ if (loginUser == null || loginUser.getUser() == null)
|
|
|
+ {
|
|
|
+ log.warn("WebSocket 鉴权失败:无效 token");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ Long userId = loginUser.getUserId();
|
|
|
+ TsbUserDeviceBind bind = tsbUserDeviceMapper.selectBindByUserId(userId);
|
|
|
+ if (bind == null || bind.getDeviceSn() == null)
|
|
|
+ {
|
|
|
+ log.warn("WebSocket 鉴权失败:用户未绑定设备, userId={}", userId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ // session中写入用户设备信息 便于后续获取
|
|
|
+ session.getUserProperties().put(TsbWebSocketConstants.DEVICE_SN_KEY, bind.getDeviceSn());
|
|
|
+ session.getUserProperties().put(TsbWebSocketConstants.USER_DEVICE_BIND_KEY, bind);
|
|
|
+ session.getUserProperties().put(TsbWebSocketConstants.USER_PERMISSIONS_KEY, loginUser.getPermissions());
|
|
|
+ // 创建信息map
|
|
|
+ TsbWebSocketUsers.put(bind.getDeviceSn(), session);
|
|
|
+ log.info("WebSocket 鉴权成功, userId={}, deviceSn={}", userId, bind.getDeviceSn());
|
|
|
+ return JSONObject.from(bind);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理 Web端 -> 后端消息 -> 设备 MQTT
|
|
|
+ */
|
|
|
+ public void pushDeviceSyncFromPage(Session session, String message)
|
|
|
+ {
|
|
|
+ if (StringUtils.isEmpty(message))
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 1、解析消息
|
|
|
+ TsbWebSocketMessage tsbWebSocketMessage = JSON.parseObject(message, TsbWebSocketMessage.class);
|
|
|
+ if (tsbWebSocketMessage == null || StringUtils.isEmpty(tsbWebSocketMessage.getCmdType()))
|
|
|
+ {
|
|
|
+ log.warn("WebSocket 消息格式无效, sessionId={}, message={}", session.getId(), message);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 2、组装下发所需的消息
|
|
|
+ TsbUserDeviceBind bind = getBindFromSession(session);
|
|
|
+ if (bind == null || bind.getDeviceSn() == null || StringUtils.isEmpty(bind.getDeviceType())) {
|
|
|
+ log.warn("WebSocket 获取用户设备信息失败, sessionId={}, bind={}", session.getId(), bind);
|
|
|
+ TsbWebSocketUsers.sendMessageToUserByText(session,
|
|
|
+ TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "会话缺少设备绑定信息,请重新连接"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Set<String> userPermissions = getUserPermissionsFromSession(session);
|
|
|
+ if (StringUtils.isEmpty(userPermissions) || !userPermissions.contains(tsbWebSocketMessage.getCmdType()))
|
|
|
+ {
|
|
|
+ log.warn("WebSocket 用户权限不足, sessionId={}, cmdType={}", session.getId(), tsbWebSocketMessage.getCmdType());
|
|
|
+ TsbWebSocketUsers.sendMessageToUserByText(session,
|
|
|
+ TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "用户权限不足"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ CmdTypeEnum cmdType = CmdTypeEnum.resolveDownlink(tsbWebSocketMessage.getCmdType());
|
|
|
+ if (cmdType == null)
|
|
|
+ {
|
|
|
+ log.warn("WebSocket 命令类型无效, sessionId={}, cmdType={}", session.getId(), tsbWebSocketMessage.getCmdType());
|
|
|
+ TsbWebSocketUsers.sendMessageToUserByText(session,
|
|
|
+ TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "命令类型无效"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (!deviceOnlineManager.isOnline(bind.getDeviceSn()))
|
|
|
+ {
|
|
|
+ log.warn("WebSocket 设备未在线, sessionId={}, deviceSn={}", session.getId(), bind.getDeviceSn());
|
|
|
+ TsbWebSocketUsers.sendMessageToUserByText(session,
|
|
|
+ TsbWebSocketMessage.fail(tsbWebSocketMessage.getCmdType(), "设备未在线"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3、发送消息
|
|
|
+ TaxDataDown down = tsbWebSocketMessage.getData() == null ?
|
|
|
+ new TaxDataDown() : tsbWebSocketMessage.getData().toJavaObject(TaxDataDown.class);
|
|
|
+ down.setDeviceType(bind.getDeviceType());
|
|
|
+ down.setDeviceSn(bind.getDeviceSn());
|
|
|
+ down.setCmdType(cmdType.getCmdDownType());
|
|
|
+ log.info("Web端 -> 设备 MQTT 同步消息, userId={}, deviceSn={}", bind.getUserId(), bind.getDeviceSn());
|
|
|
+ String key = MsgHandlerUtil.getEncoderKey(MsgTypeEnum.JSON_BODY);
|
|
|
+ IEncoder<BaseJsonBody> encoder = (IEncoder<BaseJsonBody>) handlerManager.getEncoder(key);
|
|
|
+ if (encoder == null)
|
|
|
+ {
|
|
|
+ log.warn("未找到 JSON Body 编码器, deviceSn={}", bind.getDeviceSn());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("MQTT 下行发送, cmdType={}, deviceType={}, deviceSn={}",
|
|
|
+ cmdType.getCmdType(), bind.getDeviceType(), bind.getDeviceSn());
|
|
|
+ encoder.encode(down);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从 Session 中安全获取绑定信息
|
|
|
+ */
|
|
|
+ public TsbUserDeviceBind getBindFromSession(Session session) {
|
|
|
+ Object bindObj = session.getUserProperties().get(TsbWebSocketConstants.USER_DEVICE_BIND_KEY);
|
|
|
+ return bindObj instanceof TsbUserDeviceBind ? (TsbUserDeviceBind) bindObj : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从 Session 中安全获取用户权限集合
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public Set<String> getUserPermissionsFromSession(Session session) {
|
|
|
+ Object permissionsObj = session.getUserProperties().get(TsbWebSocketConstants.USER_PERMISSIONS_KEY);
|
|
|
+ return permissionsObj instanceof Set ? (Set<String>) permissionsObj : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从 Session 中安全获取 deviceSn
|
|
|
+ */
|
|
|
+ public Long getDeviceSnFromSession(Session session) {
|
|
|
+ Object deviceSnObj = session.getUserProperties().get(TsbWebSocketConstants.DEVICE_SN_KEY);
|
|
|
+ return deviceSnObj instanceof Long ? (Long) deviceSnObj : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 设备 MQTT 上行解析后推送到 Web端
|
|
|
+ */
|
|
|
+ public void pushPageSyncFromDevice(String pageKey, JSONObject bodyObj)
|
|
|
+ {
|
|
|
+ // 1、通过 deviceSn 判断设备是否在线
|
|
|
+ Long deviceSn = bodyObj.getLong("deviceSn");
|
|
|
+ if (deviceSn == null)
|
|
|
+ {
|
|
|
+ log.warn("MQTT上行设备SN码无效");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (!deviceOnlineManager.isOnline(deviceSn))
|
|
|
+ {
|
|
|
+ log.warn("MQTT上行设备未在线, deviceSn={}", deviceSn);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 2、通过 deviceSn 判断WebSocket是否连接
|
|
|
+ Map<Long, Session> sessionUsers = TsbWebSocketUsers.getSessionUsers();
|
|
|
+ if (!sessionUsers.containsKey(deviceSn)) {
|
|
|
+ log.warn("web端用户未登录, deviceSn={}", deviceSn);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 3、发送消息
|
|
|
+ TsbWebSocketMessage msg = TsbWebSocketMessage.success(pageKey, "操作成功", bodyObj);
|
|
|
+ TsbWebSocketUsers.sendMessageToUserByText(sessionUsers.get(deviceSn), msg);
|
|
|
+ }
|
|
|
+}
|