Kaynağa Gözat

1、调试宝设备 MQTT 登录在线注册表使用redis维护

liweimin 2 hafta önce
ebeveyn
işleme
b272cffa89

+ 5 - 0
ruoyi-common/src/main/java/com/ruoyi/common/constant/CacheConstants.java

@@ -41,4 +41,9 @@ public class CacheConstants
      * 登录账户密码错误次数 redis key
      */
     public static final String PWD_ERR_CNT_KEY = "pwd_err_cnt:";
+
+    /**
+     * 调试宝设备 MQTT 登录在线 redis key 前缀(完整 key:tsb:device:online:{deviceSn})
+     */
+    public static final String TSB_DEVICE_ONLINE_KEY = "tsb:device:online:";
 }

+ 50 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/DeviceLineStatusEnum.java

@@ -0,0 +1,50 @@
+package com.ruoyi.device.mqtt.enums;
+
+/**
+ * 设备在线状态枚举
+ *
+ * @author lwm
+ **/
+public enum DeviceLineStatusEnum
+{
+    ON_LINE(1, "在线"),
+    OFF_LINE(0, "离线"),
+    ;
+
+    private final Integer status;
+    private final String message;
+
+    DeviceLineStatusEnum(Integer status, String message)
+    {
+        this.status = status;
+        this.message = message;
+    }
+
+    public Integer getStatus()
+    {
+        return status;
+    }
+
+    public String getMessage()
+    {
+        return message;
+    }
+
+    // 判断是否在线
+    public static boolean isOnLine(Integer status)
+    {
+        if (status == null)
+        {
+            return false;
+        }
+        return DeviceLineStatusEnum.ON_LINE.getStatus().equals(status);
+    }
+
+    // 判断是否离线
+    public static boolean isOffLine(Integer status)
+    {
+        return !isOnLine(status);
+    }
+
+
+}

+ 123 - 19
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/DeviceOnlineManager.java

@@ -1,14 +1,19 @@
 package com.ruoyi.device.mqtt.handler;
 
+import com.ruoyi.common.constant.CacheConstants;
+import com.ruoyi.common.core.redis.RedisCache;
+import com.ruoyi.device.mqtt.enums.DeviceLineStatusEnum;
 import com.ruoyi.device.mqtt.vo.DeviceOnlineInfo;
+import jakarta.annotation.Resource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collection;
+import java.util.Date;
 
 /**
- * 调试宝设备 MQTT 登录在线注册表(deviceSn → 设备信息/用户信息)
+ * 调试宝设备 MQTT 登录在线注册表(deviceSn → 设备信息/用户信息,Redis String 维护
  *
  * @author lwm
  */
@@ -17,56 +22,155 @@ public class DeviceOnlineManager
 {
     private static final Logger log = LoggerFactory.getLogger(DeviceOnlineManager.class);
 
-    private final ConcurrentHashMap<Long, DeviceOnlineInfo> onlineMap = new ConcurrentHashMap<>();
+    /** 默认心跳超时(毫秒),超过该时间未收到心跳则判定离线 */
+    public static final long DEFAULT_HEARTBEAT_TIMEOUT_MS = 90_000L;
+
+    @Resource
+    private RedisCache redisCache;
 
     /**
-     * 注册 设备登录信息
+     * 登录在线:写入/覆盖设备登录信息,并标记在线、刷新心跳时间
      *
      * @param info 设备登录信息
      */
-    public void register(DeviceOnlineInfo info)
+    public void loginOnline(DeviceOnlineInfo info)
     {
         if (info == null || info.getDeviceSn() == null)
         {
             return;
         }
-        onlineMap.put(info.getDeviceSn(), info);
-        log.info("设备MQTT登录注册, deviceSn={}, userId={}", info.getDeviceSn(), info.getUserId());
+        info.setLineStatus(DeviceLineStatusEnum.ON_LINE.getStatus());
+        info.setHeartbeatTime(new Date());
+        saveDeviceLoginInfo(info);
+        log.info("设备登录在线, deviceSn={}, userId={}", info.getDeviceSn(), info.getUserId());
     }
 
     /**
-     * 根据设备SN码 获取设备登录信息
+     * 登出离线:保留登录信息,仅标记离线
      *
      * @param deviceSn 设备SN码
-     * @return 设备登录信息
      */
-    public DeviceOnlineInfo getByDeviceSn(Long deviceSn)
+    public void logoutOffline(Long deviceSn)
     {
         if (deviceSn == null)
         {
-            return null;
+            return;
         }
-        return onlineMap.get(deviceSn);
+        DeviceOnlineInfo info = getDeviceLoginInfo(deviceSn);
+        if (info == null)
+        {
+            log.info("设备登出离线,登录信息不存在, deviceSn={}", deviceSn);
+            return;
+        }
+        info.setLineStatus(DeviceLineStatusEnum.OFF_LINE.getStatus());
+        saveDeviceLoginInfo(info);
+        log.info("设备登出离线, deviceSn={}, userId={}", deviceSn, info.getUserId());
     }
 
     /**
-     * 移除 设备登录信息
+     * 心跳在线:刷新心跳时间,恢复/保持在线状态
      *
      * @param deviceSn 设备SN码
      */
-    public void remove(Long deviceSn)
+    public void heartbeatOnline(Long deviceSn)
     {
         if (deviceSn == null)
         {
             return;
         }
-        DeviceOnlineInfo removed = onlineMap.remove(deviceSn);
-        if (removed != null)
+        DeviceOnlineInfo info = getDeviceLoginInfo(deviceSn);
+        if (info == null)
+        {
+            log.debug("设备心跳,登录信息不存在, deviceSn={}", deviceSn);
+            return;
+        }
+        info.setLineStatus(DeviceLineStatusEnum.ON_LINE.getStatus());
+        info.setHeartbeatTime(new Date());
+        saveDeviceLoginInfo(info);
+        log.debug("设备心跳在线, deviceSn={}", deviceSn);
+    }
+
+    /**
+     * 长时间未心跳离线:标记离线(由定时扫描或外部检测调用)
+     *
+     * @param deviceSn 设备SN码
+     */
+    public void offlineByHeartbeatTimeout(Long deviceSn)
+    {
+        if (deviceSn == null)
         {
-            log.info("设备MQTT在线信息移除, deviceSn={}", deviceSn);
-        } else
+            return;
+        }
+        DeviceOnlineInfo info = getDeviceLoginInfo(deviceSn);
+        if (info == null || DeviceLineStatusEnum.isOffLine(info.getLineStatus()))
         {
-            log.info("设备MQTT在线信息不存在, deviceSn={}", deviceSn);
+            return;
         }
+        info.setLineStatus(DeviceLineStatusEnum.OFF_LINE.getStatus());
+        saveDeviceLoginInfo(info);
+        log.info("设备心跳超时离线, deviceSn={}, lastHeartbeat={}", deviceSn, info.getHeartbeatTime());
+    }
+
+    /**
+     * 扫描所有在线设备,将心跳超时的设备标记为离线
+     *
+     */
+    public void scanOfflineByHeartbeatTimeout()
+    {
+        Collection<String> keys = redisCache.keys(CacheConstants.TSB_DEVICE_ONLINE_KEY + "*");
+        if (keys == null || keys.isEmpty())
+        {
+            return;
+        }
+        Date now = new Date();
+        for (String key : keys)
+        {
+            DeviceOnlineInfo info = redisCache.getCacheObject(key);
+            if (info == null || DeviceLineStatusEnum.isOffLine(info.getLineStatus()))
+            {
+                continue;
+            }
+            Date lastHeartbeat = info.getHeartbeatTime();
+            if (lastHeartbeat == null || now.getTime() - lastHeartbeat.getTime() > DEFAULT_HEARTBEAT_TIMEOUT_MS)
+            {
+                offlineByHeartbeatTimeout(info.getDeviceSn());
+            }
+        }
+    }
+
+    /**
+     * 获取设备登录信息(含在线状态与心跳时间)
+     *
+     * @param deviceSn 设备SN码
+     * @return 设备登录信息;未登录过则返回 null
+     */
+    public DeviceOnlineInfo getDeviceLoginInfo(Long deviceSn)
+    {
+        if (deviceSn == null)
+        {
+            return null;
+        }
+        return redisCache.getCacheObject(getDeviceOnlineKey(deviceSn));
+    }
+
+    /**
+     * 设备是否在线
+     *
+     * @param deviceSn 设备SN码
+     */
+    public boolean isOnline(Long deviceSn)
+    {
+        DeviceOnlineInfo info = getDeviceLoginInfo(deviceSn);
+        return info != null && DeviceLineStatusEnum.isOnLine(info.getLineStatus());
+    }
+
+    private void saveDeviceLoginInfo(DeviceOnlineInfo info)
+    {
+        redisCache.setCacheObject(getDeviceOnlineKey(info.getDeviceSn()), info);
+    }
+
+    private String getDeviceOnlineKey(Long deviceSn)
+    {
+        return CacheConstants.TSB_DEVICE_ONLINE_KEY + deviceSn;
     }
 }

+ 1 - 2
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/service/DeviceLoginService.java

@@ -133,8 +133,7 @@ public class DeviceLoginService implements IJsonCmdHandler
         onlineInfo.setDeviceSn(request.getDeviceSn());
         onlineInfo.setDeviceType(request.getDeviceType());
         onlineInfo.setImei(request.getImei());
-        onlineInfo.setBindTime(bindResult.getBindTime());
-        deviceOnlineManager.register(onlineInfo);
+        deviceOnlineManager.loginOnline(onlineInfo);
 
         return response;
     }

+ 22 - 5
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/vo/DeviceOnlineInfo.java

@@ -24,7 +24,16 @@ public class DeviceOnlineInfo
     private Long deviceSn;
     private String deviceType;
     private String imei;
-    private Date bindTime;
+
+    /**
+     * 在线状态(0:离线;1:在线)
+     */
+    private Integer lineStatus;
+
+    /**
+     * 心跳时间
+     */
+    private Date heartbeatTime;
 
     public Long getUserId() {
         return userId;
@@ -106,11 +115,19 @@ public class DeviceOnlineInfo
         this.imei = imei;
     }
 
-    public Date getBindTime() {
-        return bindTime;
+    public Integer getLineStatus() {
+        return lineStatus;
+    }
+
+    public void setLineStatus(Integer lineStatus) {
+        this.lineStatus = lineStatus;
+    }
+
+    public Date getHeartbeatTime() {
+        return heartbeatTime;
     }
 
-    public void setBindTime(Date bindTime) {
-        this.bindTime = bindTime;
+    public void setHeartbeatTime(Date heartbeatTime) {
+        this.heartbeatTime = heartbeatTime;
     }
 }