Преглед изворни кода

1、初始化mqtt交互报文功能
2、添加调试宝登录、调试宝PT产测交互的json报文功能

liweimin пре 4 недеља
родитељ
комит
68517ee68e
40 измењених фајлова са 2830 додато и 89 уклоњено
  1. 8 0
      pom.xml
  2. 85 87
      ruoyi-admin/src/main/resources/application-druid.yml
  3. 2 0
      ruoyi-common/src/main/java/com/ruoyi/common/utils/DateUtils.java
  4. 11 2
      ruoyi-device/pom.xml
  5. 27 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/annotation/ConsumerHandler.java
  6. 27 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/annotation/JsonCmdHandler.java
  7. 23 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/annotation/ProducerHandler.java
  8. 199 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/client/EmqClient.java
  9. 104 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/client/MessageCallback.java
  10. 48 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/config/MessageRouter.java
  11. 182 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/config/MqttConfig.java
  12. 40 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/constants/MqttConstants.java
  13. 49 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/BaseBody.java
  14. 75 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/BaseJsonBody.java
  15. 55 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/decoder/DeviceLoginRequest.java
  16. 30 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/decoder/DevicePtRequest.java
  17. 83 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/encoder/DeviceLoginResponse.java
  18. 87 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/encoder/DevicePtResponse.java
  19. 128 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/CmdTypeEnum.java
  20. 137 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/MsgTypeEnum.java
  21. 23 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/QosEnum.java
  22. 39 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/VersionEnum.java
  23. 78 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/HandlerManager.java
  24. 158 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/AbstractDecoder.java
  25. 23 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/IDecoder.java
  26. 191 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/MessageHandler.java
  27. 22 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/IJsonCmdHandler.java
  28. 89 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/JsonBodyDecoder.java
  29. 65 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/JsonCmdHandlerManager.java
  30. 122 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/service/DeviceLoginService.java
  31. 88 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/service/DevicePtService.java
  32. 179 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/encoder/AbstractEncoder.java
  33. 17 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/encoder/IEncoder.java
  34. 48 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/encoder/json/JsonBodyEncoder.java
  35. 25 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/listener/DeviceMessageListener.java
  36. 25 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/listener/InventMessageListener.java
  37. 47 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/util/CRC16Standard.java
  38. 50 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/util/MsgHandlerUtil.java
  39. 99 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/vo/CommonHeader.java
  40. 42 0
      ruoyi-device/src/main/java/com/ruoyi/device/mqtt/vo/CommonTopic.java

+ 8 - 0
pom.xml

@@ -31,6 +31,7 @@
         <jwt.version>0.9.1</jwt.version>
         <jaxb-api.version>2.3.1</jaxb-api.version>
         <springdoc.version>3.0.2</springdoc.version>
+        <mqtt.version>1.2.2</mqtt.version>
     </properties>
 
     <!-- 依赖声明 -->
@@ -177,6 +178,13 @@
                 <version>${ruoyi.version}</version>
             </dependency>
 
+            <!-- 硬件 MQTT(EMQX / Eclipse Paho) -->
+            <dependency>
+                <groupId>org.eclipse.paho</groupId>
+                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+                <version>${mqtt.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 

+ 85 - 87
ruoyi-admin/src/main/resources/application-druid.yml

@@ -1,88 +1,86 @@
-# 数据源配置
-spring:
-    datasource:
-        type: com.alibaba.druid.pool.DruidDataSource
-        driverClassName: com.mysql.cj.jdbc.Driver
-        druid:
-            # 主库数据源
-            master:
-                url: jdbc:mysql://192.168.0.100:3306/cpyypt-tsb?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
-                username: wbjw
-                password: l1M9kX7S3z4RzcWW
-            # 从库数据源
-            slave:
-                # 从数据源开关/默认关闭
-                enabled: false
-                url: 
-                username: 
-                password: 
-            # 初始连接数
-            initialSize: 5
-            # 最小连接池数量
-            minIdle: 10
-            # 最大连接池数量
-            maxActive: 20
-            # 配置获取连接等待超时的时间
-            maxWait: 60000
-            # 配置连接超时时间
-            connectTimeout: 30000
-            # 配置网络超时时间
-            socketTimeout: 60000
-            # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
-            timeBetweenEvictionRunsMillis: 60000
-            # 配置一个连接在池中最小生存的时间,单位是毫秒
-            minEvictableIdleTimeMillis: 300000
-            # 配置一个连接在池中最大生存的时间,单位是毫秒
-            maxEvictableIdleTimeMillis: 900000
-            # 配置检测连接是否有效
-            validationQuery: SELECT 1 FROM DUAL
-            testWhileIdle: true
-            testOnBorrow: false
-            testOnReturn: false
-            webStatFilter: 
-                enabled: true
-            statViewServlet:
-                enabled: true
-                # 设置白名单,不填则允许所有访问
-                allow:
-                url-pattern: /druid/*
-                # 控制台管理用户名和密码
-                login-username: ruoyi
-                login-password: 123456
-            filter:
-                stat:
-                    enabled: true
-                    # 慢SQL记录
-                    log-slow-sql: true
-                    slow-sql-millis: 1000
-                    merge-sql: true
-                wall:
-                    config:
-                        multi-statement-allow: true
-
-emqx:
-    broker: tcp://192.168.0.101:9000
-    userName: admin
-    password: houjianwei
-    cleanSession: true
-    reconnect: true
-    timeout: 20
-    keepAlive: 10
-    apiUserName: admin
-    apiPassword: public
-    apiPath: http://192.168.0.101:8081
-    downTopicTemplate: "cpyypt/down/%s/%s"
-    defaultFrameHeader: 0
-    messageRouters:
-        - topic: $share/wbjw/cpyypt/up/#
-          qos: 2
-          listener: deviceMessageListener
-        - topic: $share/wbjw/invent/up/#
-          qos: 2
-          listener: inventMessageListener
-        - topic: $SYS/brokers/+/clients/+/disconnected
-          qos: 2
-          listener: clientLineStatusListener
-        - topic: $share/wbjw/cpyypt/logup/#
-          qos: 2
+# 数据源配置
+spring:
+    datasource:
+        type: com.alibaba.druid.pool.DruidDataSource
+        driverClassName: com.mysql.cj.jdbc.Driver
+        druid:
+            # 主库数据源
+            master:
+                url: jdbc:mysql://192.168.0.100:3306/cpyypt-tsb?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+                username: wbjw
+                password: l1M9kX7S3z4RzcWW
+            # 从库数据源
+            slave:
+                # 从数据源开关/默认关闭
+                enabled: false
+                url: 
+                username: 
+                password: 
+            # 初始连接数
+            initialSize: 5
+            # 最小连接池数量
+            minIdle: 10
+            # 最大连接池数量
+            maxActive: 20
+            # 配置获取连接等待超时的时间
+            maxWait: 60000
+            # 配置连接超时时间
+            connectTimeout: 30000
+            # 配置网络超时时间
+            socketTimeout: 60000
+            # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+            timeBetweenEvictionRunsMillis: 60000
+            # 配置一个连接在池中最小生存的时间,单位是毫秒
+            minEvictableIdleTimeMillis: 300000
+            # 配置一个连接在池中最大生存的时间,单位是毫秒
+            maxEvictableIdleTimeMillis: 900000
+            # 配置检测连接是否有效
+            validationQuery: SELECT 1 FROM DUAL
+            testWhileIdle: true
+            testOnBorrow: false
+            testOnReturn: false
+            webStatFilter: 
+                enabled: true
+            statViewServlet:
+                enabled: true
+                # 设置白名单,不填则允许所有访问
+                allow:
+                url-pattern: /druid/*
+                # 控制台管理用户名和密码
+                login-username: ruoyi
+                login-password: 123456
+            filter:
+                stat:
+                    enabled: true
+                    # 慢SQL记录
+                    log-slow-sql: true
+                    slow-sql-millis: 1000
+                    merge-sql: true
+                wall:
+                    config:
+                        multi-statement-allow: true
+
+emqx:
+    broker: tcp://192.168.0.101:9000
+    userName: admin
+    password: houjianwei
+    cleanSession: true
+    reconnect: true
+    timeout: 20
+    keepAlive: 10
+    apiUserName: admin
+    apiPassword: public
+    apiPath: http://192.168.0.101:8081
+    messageRouters:
+        - topic: $share/wbjw/cpyypt/up/#
+          qos: 2
+          listener: deviceMessageListener
+        - topic: $share/wbjw/invent/up/#
+          qos: 2
+          listener: inventMessageListener
+        - topic: $SYS/brokers/+/clients/+/disconnected
+          qos: 2
+          listener: clientLineStatusListener
+        - topic: $share/wbjw/cpyypt/logup/#
+          qos: 2
           listener: deviceLogListener

+ 2 - 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/DateUtils.java

@@ -25,6 +25,8 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils
 
     public static String YYYY_MM_DD = "yyyy-MM-dd";
 
+    public static String YYYYMMDD = "yyyyMMdd";
+
     public static String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
 
     public static String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";

+ 11 - 2
ruoyi-device/pom.xml

@@ -18,16 +18,25 @@
             <groupId>com.ruoyi</groupId>
             <artifactId>ruoyi-common</artifactId>
         </dependency>
+
         <dependency>
             <groupId>com.ruoyi</groupId>
             <artifactId>ruoyi-system</artifactId>
         </dependency>
 
-        <!-- 硬件 MQTT(EMQX / Eclipse Paho) -->
+        <dependency>
+            <groupId>com.ruoyi</groupId>
+            <artifactId>ruoyi-framework</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.eclipse.paho</groupId>
             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
-            <version>1.2.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
         </dependency>
     </dependencies>
 

+ 27 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/annotation/ConsumerHandler.java

@@ -0,0 +1,27 @@
+package com.ruoyi.device.mqtt.annotation;
+
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * 上行报文 消费解析注解
+ *
+ * @author lwm
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ConsumerHandler
+{
+    /**
+     * 消息类型
+     */
+    MsgTypeEnum msgType();
+}

+ 27 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/annotation/JsonCmdHandler.java

@@ -0,0 +1,27 @@
+package com.ruoyi.device.mqtt.annotation;
+
+import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * JSON Body 业务命令处理器(按 {@link CmdTypeEnum} 注册)
+ *
+ * @author lwm
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface JsonCmdHandler
+{
+    /**
+     * 业务命令类型
+     */
+    CmdTypeEnum cmdType();
+}

+ 23 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/annotation/ProducerHandler.java

@@ -0,0 +1,23 @@
+package com.ruoyi.device.mqtt.annotation;
+
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.*;
+
+/**
+ * 下行报文 发送处理注解
+ *
+ * @author lwm
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ProducerHandler
+{
+    /**
+     * 消息类型
+     */
+    MsgTypeEnum msgType();
+}

+ 199 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/client/EmqClient.java

@@ -0,0 +1,199 @@
+package com.ruoyi.device.mqtt.client;
+
+import com.ruoyi.common.utils.StringUtils;
+import com.ruoyi.common.utils.spring.SpringUtils;
+import com.ruoyi.common.utils.uuid.IdUtils;
+import com.ruoyi.device.mqtt.config.MessageRouter;
+import com.ruoyi.device.mqtt.config.MqttConfig;
+import com.ruoyi.device.mqtt.constants.MqttConstants;
+import com.ruoyi.device.mqtt.enums.QosEnum;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.annotation.Resource;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * mqtt 客户端 初始化
+ *
+ * @author lwm
+ */
+@Component
+public class EmqClient
+{
+    private static final Logger log = LoggerFactory.getLogger(EmqClient.class);
+
+    private IMqttClient mqttClient;
+    @Resource
+    private MqttCallback mqttCallback;
+    @Resource
+    private MqttConfig mqttConfig;
+
+    @PostConstruct
+    public synchronized void init()
+    {
+        if (StringUtils.isEmpty(mqttConfig.getBroker()))
+        {
+            log.warn("未配置 emqx.broker,跳过 MQTT 客户端初始化");
+            return;
+        }
+        MqttClientPersistence persistence = new MemoryPersistence();
+        String clientId = MqttConstants.CLIENT_ID_PREFIX + IdUtils.simpleUUID();
+        try
+        {
+            log.info("客户端ID:{}", clientId);
+            mqttClient = new MqttClient(mqttConfig.getBroker(), clientId, persistence);
+            connect();
+            subscribeAll();
+        }
+        catch (MqttException e)
+        {
+            log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}",
+                    e.getMessage(), mqttConfig.getBroker(), clientId);
+        }
+    }
+
+    /**
+     * 连接broker
+     */
+    public void connect()
+    {
+        if (mqttClient == null)
+        {
+            return;
+        }
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setAutomaticReconnect(Boolean.TRUE.equals(mqttConfig.getReconnect()));
+        options.setUserName(mqttConfig.getUserName());
+        options.setPassword(StringUtils.isEmpty(mqttConfig.getPassword()) ? new char[0] : mqttConfig.getPassword().toCharArray());
+        options.setCleanSession(Boolean.TRUE.equals(mqttConfig.getCleanSession()));
+        options.setKeepAliveInterval(mqttConfig.getKeepAlive());
+        options.setConnectionTimeout(mqttConfig.getTimeout());
+        mqttClient.setCallback(mqttCallback);
+        try
+        {
+            mqttClient.connect(options);
+            log.info("mqtt客户端连接服务端成功");
+        }
+        catch (MqttException e)
+        {
+            log.error("mqtt客户端连接服务端失败,失败原因{}", e.getMessage());
+        }
+    }
+
+    /**
+     * 默认订阅全部主题
+     */
+    public void subscribeAll()
+    {
+        if (mqttClient == null || !mqttClient.isConnected())
+        {
+            return;
+        }
+        if (StringUtils.isEmpty(mqttConfig.getMessageRouters()))
+        {
+            return;
+        }
+        for (MessageRouter router : mqttConfig.getMessageRouters())
+        {
+            try
+            {
+                IMqttMessageListener listener = SpringUtils.getBean(router.getListener());
+                mqttClient.subscribe(router.getTopic(), router.getQos(), listener);
+                log.info("订阅主题名称:{}, QOS:{}, 监听器:{}", router.getTopic(), router.getQos(), router.getListener());
+            }
+            catch (Exception e)
+            {
+                log.error("订阅主题失败 topic={} listener={} : errormsg={}", router.getTopic(), router.getListener(),
+                    e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * 重连
+     */
+    public void reConnect()
+    {
+        if (mqttClient == null)
+        {
+            return;
+        }
+        for (int i = 0; i < MqttConstants.CLIENT_RECONNECT_ATTEMPTS; i++)
+        {
+            if (mqttClient.isConnected())
+            {
+                break;
+            }
+            try
+            {
+                mqttClient.reconnect();
+                log.info("连接中断,尝试重新连接,重新连接次数:{}", i + 1);
+                TimeUnit.SECONDS.sleep(MqttConstants.CLIENT_RECONNECT_INTERVAL);
+            }
+            catch (MqttException e)
+            {
+                log.error("重连失败,失败原因{}", e.getMessage());
+            }
+            catch (InterruptedException e)
+            {
+                log.error("线程等待重连失败,失败原因{}", e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * 发布消息
+     * @param topic 主题
+     * @param msg 消息
+     * @param qos 等级
+     * @param retain 是否保留
+     */
+    public void publish(String topic, byte[] msg, QosEnum qos, boolean retain)
+    {
+        if (mqttClient == null || !mqttClient.isConnected())
+        {
+            log.warn("发布消息失败,mqtt未连接,放弃发布 topic={}", topic);
+            return;
+        }
+        try
+        {
+            MqttMessage mqttMessage = new MqttMessage();
+            mqttMessage.setPayload(msg);
+            mqttMessage.setQos(qos.getValue());
+            mqttMessage.setRetained(retain);
+            mqttClient.publish(topic, mqttMessage);
+        }
+        catch (MqttException e)
+        {
+            log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}",
+                    e.getMessage(), topic, msg, qos.getValue(), retain);
+        }
+    }
+
+    /**
+     * 断开连接
+     */
+    @PreDestroy
+    public void disconnectOnShutdown()
+    {
+        if (mqttClient != null && mqttClient.isConnected())
+        {
+            try
+            {
+                mqttClient.disconnect();
+            }
+            catch (MqttException e)
+            {
+                log.error("断开连接产生异常,异常信息{}", e.getMessage());
+            }
+        }
+    }
+
+
+}

+ 104 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/client/MessageCallback.java

@@ -0,0 +1,104 @@
+package com.ruoyi.device.mqtt.client;
+
+import com.ruoyi.common.utils.StringUtils;
+import com.ruoyi.common.utils.spring.SpringUtils;
+import com.ruoyi.device.mqtt.config.MessageRouter;
+import com.ruoyi.device.mqtt.config.MqttConfig;
+import jakarta.annotation.Resource;
+import org.eclipse.paho.client.mqttv3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+
+/**
+ * 连接完成补订阅、断线重连
+ * 接收消息、发送消息
+ *
+ * @author lwm
+ */
+@Component
+public class MessageCallback implements MqttCallbackExtended
+{
+    private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);
+
+    @Resource
+    @Lazy
+    EmqClient emqClient;
+    @Resource
+    MqttConfig mqttConfig;
+
+    /**
+     * 连接完成,订阅所有主题
+     */
+    @Override
+    public void connectComplete(boolean reconnect, String serverUri)
+    {
+        emqClient.subscribeAll();
+    }
+
+    /**
+     * 丢失了对服务端的连接后触发的回调
+     */
+    @Override
+    public void connectionLost(Throwable cause)
+    {
+        emqClient.reConnect();
+    }
+
+    /**
+     * 应用收到消息后触发的回调
+     * 之前在 EmqClient 已经使用 subscribe(topic, qos, IMqttMessageListener) 时由监听器消费
+     *
+     * 现在再来一遍
+     */
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception
+    {
+        if (StringUtils.isEmpty(mqttConfig.getMessageRouters()))
+        {
+            return;
+        }
+        for (MessageRouter router : mqttConfig.getMessageRouters())
+        {
+            String topicFilter = router.getTopic();
+            if(isMatched(topicFilter, topic))
+            {
+                IMqttMessageListener listener = SpringUtils.getBean(router.getListener());
+                listener.messageArrived(topic, message);
+            }
+        }
+    }
+
+    /**
+     * 判断主题是否匹配
+     *
+     */
+    private boolean isMatched(String topicFilter, String topic) {
+        if (topicFilter.startsWith("$queue/"))
+        {
+            topicFilter = topicFilter.replaceFirst("\\$queue/", "");
+        }
+        else if (topicFilter.startsWith("$share/"))
+        {
+            topicFilter = topicFilter.replaceFirst("\\$share/", "");
+            topicFilter = topicFilter.substring(topicFilter.indexOf('/') + 1);
+        }
+        return MqttTopic.isMatched(topicFilter, topic);
+    }
+
+    /**
+     * 消息发布者消息发布完成产生的回调
+     */
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token)
+    {
+        if (token == null)
+        {
+            return;
+        }
+        int messageId = token.getMessageId();
+        String[] topics = token.getTopics();
+        log.info("消息发布完成,messageid={},topics={}", messageId, topics);
+    }
+}

+ 48 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/config/MessageRouter.java

@@ -0,0 +1,48 @@
+package com.ruoyi.device.mqtt.config;
+
+/**
+ * MQTT 订阅
+ *
+ * @author lwm
+ */
+public class MessageRouter
+{
+    /** EMQX 主题,支持 $share 组共享订阅 */
+    private String topic;
+
+    /** QoS 0/1/2 */
+    private int qos;
+
+    /** 监听器 Spring Bean 名称,实现 {@link org.eclipse.paho.client.mqttv3.IMqttMessageListener} 的 Bean 名称 */
+    private String listener;
+
+    public String getTopic()
+    {
+        return topic;
+    }
+
+    public void setTopic(String topic)
+    {
+        this.topic = topic;
+    }
+
+    public int getQos()
+    {
+        return qos;
+    }
+
+    public void setQos(int qos)
+    {
+        this.qos = qos;
+    }
+
+    public String getListener()
+    {
+        return listener;
+    }
+
+    public void setListener(String listener)
+    {
+        this.listener = listener;
+    }
+}

+ 182 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/config/MqttConfig.java

@@ -0,0 +1,182 @@
+package com.ruoyi.device.mqtt.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * MQTT配置信息
+ *
+ * @author lwm
+ */
+@Configuration
+@ConfigurationProperties(prefix = "emqx")
+public class MqttConfig
+{
+    /**
+     * emqx服务器地址
+     */
+    private String broker;
+
+    /**
+     * 用户名
+     */
+    private String userName;
+
+    /**
+     * 密码
+     */
+    private String password;
+
+    /**
+     * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
+     */
+    private Boolean cleanSession = true;
+
+    /**
+     * 是否断线重连
+     */
+    private Boolean reconnect = true;
+
+    /**
+     * 连接超时时间
+     */
+    private Integer timeout = 20;
+
+    /**
+     * 心跳间隔
+     */
+    private Integer keepAlive = 10;
+
+    /**
+     * 接口调用登陆名
+     */
+    private String apiUserName;
+
+    /**
+     * 接口调用登陆密码
+     */
+    private String apiPassword;
+
+    /**
+     * 接口调用地址
+     */
+    private String apiPath;
+
+    /**
+     * topic对应消息监听器
+     */
+    private List<MessageRouter> messageRouters = new ArrayList<>();
+
+    public String getBroker()
+    {
+        return broker;
+    }
+
+    public void setBroker(String broker)
+    {
+        this.broker = broker;
+    }
+
+    public String getUserName()
+    {
+        return userName;
+    }
+
+    public void setUserName(String userName)
+    {
+        this.userName = userName;
+    }
+
+    public String getPassword()
+    {
+        return password;
+    }
+
+    public void setPassword(String password)
+    {
+        this.password = password;
+    }
+
+    public Boolean getCleanSession()
+    {
+        return cleanSession;
+    }
+
+    public void setCleanSession(Boolean cleanSession)
+    {
+        this.cleanSession = cleanSession;
+    }
+
+    public Boolean getReconnect()
+    {
+        return reconnect;
+    }
+
+    public void setReconnect(Boolean reconnect)
+    {
+        this.reconnect = reconnect;
+    }
+
+    public Integer getTimeout()
+    {
+        return timeout;
+    }
+
+    public void setTimeout(Integer timeout)
+    {
+        this.timeout = timeout;
+    }
+
+    public Integer getKeepAlive()
+    {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(Integer keepAlive)
+    {
+        this.keepAlive = keepAlive;
+    }
+
+    public String getApiUserName()
+    {
+        return apiUserName;
+    }
+
+    public void setApiUserName(String apiUserName)
+    {
+        this.apiUserName = apiUserName;
+    }
+
+    public String getApiPassword()
+    {
+        return apiPassword;
+    }
+
+    public void setApiPassword(String apiPassword)
+    {
+        this.apiPassword = apiPassword;
+    }
+
+    public String getApiPath()
+    {
+        return apiPath;
+    }
+
+    public void setApiPath(String apiPath)
+    {
+        this.apiPath = apiPath;
+    }
+
+    public List<MessageRouter> getMessageRouters()
+    {
+        return messageRouters;
+    }
+
+    public void setMessageRouters(List<MessageRouter> messageRouters)
+    {
+        this.messageRouters = messageRouters;
+    }
+}

+ 40 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/constants/MqttConstants.java

@@ -0,0 +1,40 @@
+package com.ruoyi.device.mqtt.constants;
+
+/**
+ * mqtt的 常量数据
+ */
+public interface MqttConstants
+{
+    /**
+     * 客户端ID前缀
+     */
+    String CLIENT_ID_PREFIX = "SYSTEM-";
+
+    /**
+     * 客户端重连次数
+     */
+    int CLIENT_RECONNECT_ATTEMPTS = 100;
+
+    /**
+     * 客户端重连尝试间隔时间
+     */
+    long CLIENT_RECONNECT_INTERVAL = 5;
+
+    /**
+     * code 1:成功,0:失败
+     */
+    int SUCCESS_CODE = 1;
+    int FAIL_CODE = 0;
+
+    /**
+     * 帧头
+     */
+    int FRAME_HEADER = 0xfefe;
+
+    /**
+     * 设备下行的 topic
+     */
+    String DEVICE_DOWN_TOPIC = "cpyypt/tsbdown/%s/%s";
+
+
+}

+ 49 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/BaseBody.java

@@ -0,0 +1,49 @@
+package com.ruoyi.device.mqtt.domain;
+
+import java.io.Serializable;
+
+/**
+ * TSB 调试宝 统一 Body
+ *
+ * @author lwm
+ */
+public class BaseBody implements Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    /** 一级设备类型 */
+    private String deviceType;
+
+    /** 一级设备 SN */
+    private Long deviceSn;
+
+    public BaseBody()
+    {
+    }
+
+    public BaseBody(String deviceType, Long deviceSn)
+    {
+        this.deviceType = deviceType;
+        this.deviceSn = deviceSn;
+    }
+
+    public String getDeviceType()
+    {
+        return deviceType;
+    }
+
+    public void setDeviceType(String deviceType)
+    {
+        this.deviceType = deviceType;
+    }
+
+    public Long getDeviceSn()
+    {
+        return deviceSn;
+    }
+
+    public void setDeviceSn(Long deviceSn)
+    {
+        this.deviceSn = deviceSn;
+    }
+}

+ 75 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/BaseJsonBody.java

@@ -0,0 +1,75 @@
+package com.ruoyi.device.mqtt.domain;
+
+import com.ruoyi.device.mqtt.constants.MqttConstants;
+
+/**
+ * TSB 调试宝统一 JSON Body(firstType=0x4A 时 UTF-8 JSON)
+ *
+ * @author lwm
+ */
+public class BaseJsonBody extends BaseBody
+{
+    /** 请求方法类型,如 tsb:login:up */
+    private String cmdType;
+
+    /** 是否成功,1:成功,0:失败 */
+    private Integer code;
+
+    /** 正确或者错误提示 */
+    private String msg;
+
+    public BaseJsonBody()
+    {
+        super();
+    }
+
+    public BaseJsonBody(String deviceType, Long deviceSn, String cmdType, Integer code, String msg)
+    {
+        super(deviceType, deviceSn);
+        this.cmdType = cmdType;
+        this.code = code;
+        this.msg = msg;
+    }
+
+    /**
+     * 失败应答(不指定下行 cmdType)
+     */
+    public static BaseJsonBody fail(String deviceType, Long deviceSn, String msg)
+    {
+        return fail(deviceType, deviceSn, null, msg);
+    }
+
+    /**
+     * 失败应答(可指定下行 cmdType)
+     */
+    public static BaseJsonBody fail(String deviceType, Long deviceSn, String cmdType, String msg)
+    {
+        return new BaseJsonBody(deviceType, deviceSn, cmdType, MqttConstants.FAIL_CODE, msg);
+    }
+
+    public String getCmdType()
+    {
+        return cmdType;
+    }
+
+    public void setCmdType(String cmdType)
+    {
+        this.cmdType = cmdType;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public void setCode(Integer code) {
+        this.code = code;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+}

+ 55 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/decoder/DeviceLoginRequest.java

@@ -0,0 +1,55 @@
+package com.ruoyi.device.mqtt.domain.decoder;
+
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+
+/**
+ * TSB 登录请求
+ *
+ * @author lwm
+ */
+public class DeviceLoginRequest extends BaseJsonBody
+{
+    /** 用户名 */
+    private String userName;
+
+    /** 用户密码 */
+    private String userPassword;
+
+    /** 调试宝 IMEI,保证使用人唯一 */
+    private String imei;
+
+    public DeviceLoginRequest()
+    {
+        super();
+    }
+
+    public String getUserName()
+    {
+        return userName;
+    }
+
+    public void setUserName(String userName)
+    {
+        this.userName = userName;
+    }
+
+    public String getUserPassword()
+    {
+        return userPassword;
+    }
+
+    public void setUserPassword(String userPassword)
+    {
+        this.userPassword = userPassword;
+    }
+
+    public String getImei()
+    {
+        return imei;
+    }
+
+    public void setImei(String imei)
+    {
+        this.imei = imei;
+    }
+}

+ 30 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/decoder/DevicePtRequest.java

@@ -0,0 +1,30 @@
+package com.ruoyi.device.mqtt.domain.decoder;
+
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+
+/**
+ * TSB 产测请求SN码
+ *
+ * @author lwm
+ */
+public class DevicePtRequest extends BaseJsonBody
+{
+
+    /** 调试宝 IMEI,保证使用人唯一 */
+    private String imei;
+
+    public DevicePtRequest()
+    {
+        super();
+    }
+
+    public String getImei()
+    {
+        return imei;
+    }
+
+    public void setImei(String imei)
+    {
+        this.imei = imei;
+    }
+}

+ 83 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/encoder/DeviceLoginResponse.java

@@ -0,0 +1,83 @@
+package com.ruoyi.device.mqtt.domain.encoder;
+
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+
+import java.util.Set;
+
+/**
+ * TSB 登录应答
+ *
+ * @author lwm。
+ */
+public class DeviceLoginResponse extends BaseJsonBody
+{
+    /** 用户名 */
+    private String userName;
+
+    /** 手机号 */
+    private String phoneNumber;
+
+    /** 用户昵称 */
+    private String nickName;
+
+    /** 用户角色名称 */
+    private String roleName;
+
+    /** 菜单权限列表 */
+    private Set<String> permissions;
+
+    public DeviceLoginResponse()
+    {
+        super();
+    }
+
+    public String getUserName()
+    {
+        return userName;
+    }
+
+    public void setUserName(String userName)
+    {
+        this.userName = userName;
+    }
+
+    public String getPhoneNumber()
+    {
+        return phoneNumber;
+    }
+
+    public void setPhoneNumber(String phoneNumber)
+    {
+        this.phoneNumber = phoneNumber;
+    }
+
+    public String getNickName()
+    {
+        return nickName;
+    }
+
+    public void setNickName(String nickName)
+    {
+        this.nickName = nickName;
+    }
+
+    public String getRoleName()
+    {
+        return roleName;
+    }
+
+    public void setRoleName(String roleName)
+    {
+        this.roleName = roleName;
+    }
+
+    public Set<String> getPermissions()
+    {
+        return permissions;
+    }
+
+    public void setPermissions(Set<String> permissions)
+    {
+        this.permissions = permissions;
+    }
+}

+ 87 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/domain/encoder/DevicePtResponse.java

@@ -0,0 +1,87 @@
+package com.ruoyi.device.mqtt.domain.encoder;
+
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+
+/**
+ * TSB 产测下发应答
+ *
+ * @author lwm
+ */
+public class DevicePtResponse extends BaseJsonBody
+{
+    /** 调试宝 IMEI,保证使用人唯一 */
+    private String imei;
+
+    /** 设备配置(可选) */
+    private DeviceConfig deviceConfig;
+
+    public DevicePtResponse()
+    {
+        super();
+    }
+
+    public String getImei()
+    {
+        return imei;
+    }
+
+    public void setImei(String imei)
+    {
+        this.imei = imei;
+    }
+
+    public DeviceConfig getDeviceConfig()
+    {
+        return deviceConfig;
+    }
+
+    public void setDeviceConfig(DeviceConfig deviceConfig)
+    {
+        this.deviceConfig = deviceConfig;
+    }
+
+    /**
+     * 产测应答中的设备配置(均为可选)
+     */
+    public static class DeviceConfig
+    {
+        /** 一级设备类型 */
+        private String deviceType;
+
+        /** 一级设备 SN */
+        private Long deviceSn;
+
+        /** 设备生产日期 */
+        private String deviceProduceDate;
+
+        public String getDeviceType()
+        {
+            return deviceType;
+        }
+
+        public void setDeviceType(String deviceType)
+        {
+            this.deviceType = deviceType;
+        }
+
+        public Long getDeviceSn()
+        {
+            return deviceSn;
+        }
+
+        public void setDeviceSn(Long deviceSn)
+        {
+            this.deviceSn = deviceSn;
+        }
+
+        public String getDeviceProduceDate()
+        {
+            return deviceProduceDate;
+        }
+
+        public void setDeviceProduceDate(String deviceProduceDate)
+        {
+            this.deviceProduceDate = deviceProduceDate;
+        }
+    }
+}

+ 128 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/CmdTypeEnum.java

@@ -0,0 +1,128 @@
+package com.ruoyi.device.mqtt.enums;
+
+import com.ruoyi.common.utils.StringUtils;
+
+/**
+ * 请求方法类型枚举
+ *
+ * @author lwm
+ */
+public enum CmdTypeEnum
+{
+    TSB_LOGIN("tsb:login", "tsb:login:up", "tsb:login:down", "调试宝登录"),
+    TSB_PT("tsb:pt", "tsb:pt:up", "tsb:pt:down", "调试宝PT产测"),
+    ;
+
+    private final String cmdType;
+    private final String cmdUpType;
+    private final String cmdDownType;
+    private final String message;
+
+    CmdTypeEnum(String cmdType, String cmdUpType, String cmdDownType, String message)
+    {
+        this.cmdType = cmdType;
+        this.cmdUpType = cmdUpType;
+        this.cmdDownType = cmdDownType;
+        this.message = message;
+    }
+
+    public String getCmdUpType()
+    {
+        return cmdUpType;
+    }
+
+    public String getCmdDownType()
+    {
+        return cmdDownType;
+    }
+
+    public String getCmdType()
+    {
+        return cmdType;
+    }
+
+    public String getMessage()
+    {
+        return message;
+    }
+
+    /**
+     * 解析设备上行 cmdType(兼容 {@code cmdUpType} 与 {@code cmdType})
+     */
+    public static CmdTypeEnum resolveUplink(String raw)
+    {
+        if (StringUtils.isEmpty(raw))
+        {
+            return null;
+        }
+        String trimmed = raw.trim();
+        for (CmdTypeEnum item : values())
+        {
+            if (trimmed.equals(item.cmdUpType) || trimmed.equals(item.cmdType))
+            {
+                return item;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * 是否为下行应答 cmdType(设备误发 down 时识别)
+     */
+    public static boolean isDownlink(String raw)
+    {
+        if (StringUtils.isEmpty(raw))
+        {
+            return false;
+        }
+        String trimmed = raw.trim();
+        for (CmdTypeEnum item : values())
+        {
+            if (trimmed.equals(item.cmdDownType))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * 解析设备下行 cmdType(兼容 {@code cmdDownType} 与 {@code cmdType})
+     */
+    public static CmdTypeEnum resolveDownlink(String raw)
+    {
+        if (StringUtils.isEmpty(raw))
+        {
+            return null;
+        }
+        String trimmed = raw.trim();
+        for (CmdTypeEnum item : values())
+        {
+            if (trimmed.equals(item.cmdDownType) || trimmed.equals(item.cmdType))
+            {
+                return item;
+            }
+        }
+        return null;
+    }
+
+    /**
+     *  是否为上行请求 cmdType(设备误发 up 时识别)
+     */
+    public static boolean isUplink(String raw)
+    {
+        if (StringUtils.isEmpty(raw))
+        {
+            return false;
+        }
+        String trimmed = raw.trim();
+        for (CmdTypeEnum item : values())
+        {
+            if (trimmed.equals(item.cmdUpType))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+}

+ 137 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/MsgTypeEnum.java

@@ -0,0 +1,137 @@
+package com.ruoyi.device.mqtt.enums;
+
+/**
+ * 消息类型枚举
+ *
+ * @author lwm
+ */
+public enum MsgTypeEnum
+{
+    UPGRADE_DOWN(0x01, 0x1001, "升级下行"),
+    UPGRADE_STATUS_UP(0x01, 0x2001, "升级状态上行"),
+    UNINSTALL_DEVICE_UPGRADE(0X01, 0X1004, "未安装升级"),
+    COLLECTOR_INIT_CONFIG_DOWN(0x01, 0x1011, "采集器初始化配置下行"),
+    REFRESH_TAX_SN_DOWN(0x01, 0x1012, "刷新税控序列号"),
+    TAX_SN_GUN_CONFIG_UP(0x01, 0x2011, "税控口序列号、枪数上行"),
+    DEVICE_REBOOT_DOWN(0x01, 0x1021, "设备重启下行"),
+    ENCRYPTION_TAX_DATA_DOWN(0x01, 0x1034, "读取加密报税口数据"),
+    SCREEN_CONFIG_DOWN(0x01, 0x1019, "安装步骤下行"),
+    HEARTBEAT_DOWN(0x01, 0x1008, "安装步骤下行"),
+    TIME_SYNC_UP(0x01, 0x2015, "时间同步上行"),
+    TIME_SYNC_DOWN(0x01, 0x1015, "时间同步下行"),
+    LORA_EXHANGE(0X01, 0X1016, "lora频率切换"),
+    READ_LORA_DOWN(0X01, 0X1017, "读取lora下行"),
+    READ_LORA_UP(0X01, 0X2017, "读取lora上行"),
+    OPW_PARAM_UP(0x01, 0x2032, "液位仪配置上行"),
+    OPW_PARAM_DOWN(0x01, 0x1032, "液位仪配置下行"),
+    SENSOR_PARAM_DOWN(0x01, 0x1033, "探棒配置下行"),
+    TAX_TIME_INTERVAL(0x01, 0x1013, "报税口轮训间隔下行"),
+    GET_TAX_SN_DOWN(0x01, 0x1018, "获取税控序列号"),
+    REPAIR_TAX_CONFIG(0x01, 0x1020, "报税口维修配置"),
+    SYSTEM_SERVICE_CHANGE(0x01, 0x1003, "MQTT配置切换"),
+    SYSTEM_SERVICE_CHANGE_WIRED_NETWORK(0x01, 0x1005, "MQTT配置切换有线网"),
+    CODER_ROUTER_SEND(0x01, 0x1019, "编码器路由配置下发"),
+    CODER_ROUTER_SWITCH(0x01, 0x1022, "编码器路由配置开关"),
+    PROBE_ROUTER_DOWN(0x01, 0x1024, "路由表下发"),
+    PROBE_ROUTER_INQUIRY(0x01, 0x1025, "路由表问询"),
+    PROBE_ROUTER_UP(0x01, 0x2025, "路由表上报"),
+    PROBE_PROFILE_COM_DOWN(0x01, 0x1026, "配置文件下发(com端)"),
+    PROBE_PROFILE_STITCH_DOWN(0x01, 0x1027, "配置文件下发(针脚映射)"),
+    PROBE_PROFILE_LAYING_DOWN(0x01, 0x1040, "配置文件下发(排线方式)"),
+    OPEN_EIGHT_CODER(0X01, 0X1028, "开启8段码上报"),
+    CLOSE_EIGHT_CODER(0X01, 0X1029, "完成8段码上报"),
+    EIGHT_CODE_UP(0x01, 0x2030, "8段码数据上报"),
+    PROBE_RELATION_UP(0x01, 0x2041, "探针板主从问询"),
+    PROBE_RELATION_DOWN(0x01, 0x1041, "探针板主从应答"),
+    TUOSHENG_CIPHERTEXT_REAL_READ(0x01, 0x1051, "拓盛密文实时读取"),
+    TUOSHENG_CIPHERTEXT_REAL_REPLY(0x01, 0x2051, "拓盛密文实时读取应答"),
+    PASS_THROUGH_INSTRUCTION_DOWN(0x01, 0x1052, "透传指令下行"),
+    PASS_THROUGH_INSTRUCTION_UP(0x01, 0x2052, "透传指令上行"),
+    CODER_ORDER_DOWN(0x01, 0x1053, "编码器抬枪下行"),
+    CODER_ORDER_UP(0x01, 0x2053, "编码器抬枪上行"),
+    USER_LOGIN_DOWN(0x01, 0x1111, "登录结果返回"),
+    USER_LOGIN_UP(0x01, 0x2111, "登录账号上传"),
+    TAX_RAISE_HANG_CONFIG_UP(0x01, 0x2009, "报税口抬挂枪配置上行"),
+    TAX_RAISE_HANG_CONFIG_DOWN(0x01, 0x1009, "报税口抬挂枪配置下行"),
+    SHANXI_CONFIG_UP(0x01, 0x2090, "山西省平台配置上行"),
+    SHANXI_CONFIG_DOWN(0x01, 0x1090, "山西省平台配置下行"),
+    BLOCKER_ROUTER_DOWN(0x01, 0x1070, "阻断器路由下行"),
+    BLOCKER_STATUS_DOWN(0x01, 0x1072, "阻断器开关下行"),
+    FIRMWARE_INFO_UP(0x02, 0x2001, "固件信息上行"),
+    STATUS_INFO_UP(0x02, 0x2002, "设备状态上行"),
+    DECRYPT_DEVICE_UP(0x02, 0x2003, "解密板设备状态上行"),
+    PROBE_STATUS_INFO_UP(0x02, 0x2004, "探针版设备状态上行"),
+    GUN_BLOCKER_STATUS_INFO_UP(0x02, 0x2005, "加油枪阻断器设备状态上行"),
+    FIRMWARE_INFO_DOWN(0x02, 0x1001, "固件信息下行"),
+    TAX_DATA_UP(0x03, 0x2001, "报税口数据上行"),
+    RAISE_HANG_TAX_DATA_UP(0x03, 0x2003, "报税口数据上行"),
+    TAX_ERROR_DATA_UP(0x03, 0x2002, "税控口错误数据上行"),
+    OPW_DATA_UP(0x03, 0x2021, "液位仪数据上行"),
+    SENSOR_DATA_UP(0x03, 0x2022, "液位仪数据上行"),
+    OPW_DATA_DOWN(0x03, 0x1021, "液位仪数据下行"),
+    SCREEN_DATA_DOWN(0x03, 0x1041, "显示屏数据下行"),
+    SCREEN_DATA_UP(0x03, 0x2041, "显示屏数据上行"),
+    CODER_DATA_UP(0x03, 0x2051, "编码器数据上行"),
+    CODER_TIME_DATA_UP(0x03, 0x2053, "编码器时间差数据上行"),
+    CODER_DATA_UP_CAMERA(0x03, 0x2052, "编码器数据上行"),
+    PROBE_DATA_UP(0x03, 0x2061, "探针板数据上行"),
+    PROBE_DATA_DOWN(0x03, 0x1061, "探针板数据下行"),
+    OPW_DAY_REPORT_DATA_UP(0x03, 0x3061, "探针板数据上行"),
+    OPW_DAY_REPORT_DATA_DOWN(0x03, 0x4061, "探针板数据下行"),
+    PROBE_SAMPLING_RATE_UP(0x10, 0x2063, "探针版采样率上行"),
+    PROBE_SAMPLING_RATE_DOWN(0x10, 0x1063, "探针版采样率下行"),
+    CIPHERTEXT_DATA_UP(0x03, 0x2071, "报税口密文数据"),
+    CIPHERTEXT_RAISE_HANG_DATA_UP(0x03, 0x2073, "报税口密文数据-带抬挂枪"),
+    CIPHERTEXT_DATA_DECRYPT_DOWN(0x03, 0x1072, "报税口密文数据"),
+    CIPHERTEXT_DATA_DECRYPT_UP(0x03, 0x2072, "报税口密文数据"),
+    CIPHERTEXT_DATA_DECRYPT_RETURN_DOWN(0x03, 0x1065, "报税口密文数据返回测试板"),
+    PUSH_DATA_DOWN(0x03, 0x1090, "山西省平台数据透传下行"),
+    PUSH_DATA_UP(0x03, 0x2090, "山西省平台数据透传上行"),
+    UNINSTALL_DEVICE_UPGRADE_2(0X04, 0X1004, "未安装升级"),
+    UPGRADE_DOWN_2(0x04, 0x1002, "升级下行"),
+    UPGRADE_DOWN_3(0x04, 0x1001, "升级下行"),
+    UPGRADE_UP_2(0x04, 0x2002, "升级上行"),
+    UPGRADE_UP_3(0x04, 0x2001, "升级上行"),
+    UPGRADE_DOWN_4(0x04, 0x1030, "升级包下载下行"),
+    UPGRADE_UP_4(0x04, 0x2030, "升级包下载上行"),
+    GET_APK_UP(0x04, 0x2101, "固件查询"),
+    GET_APK_DOWN(0x04, 0x1101, "固件返回"),
+    GET_APK_URL_UP(0x04, 0x2102, "固件URL查询"),
+    GET_APK_URL_DOWN(0x04, 0x1102, "固件URL返回"),
+    VERSION_SYNC(0x04, 0x1011, "版本同步"),
+    LOG_DOWN(0x10, 0x1001, "日志开关下行"),
+    LOG_SOURCE_DOWN(0x10, 0x1002, "日志开关下行(原始数据)"),
+    LOG_UP(0x10, 0x2001, "日志明细"),
+    PROBE_ERROR_DATA_UP(0x10, 0x2062, "探针板错误数据上行"),
+
+    /**
+     * TSB调试宝 一级类型=0x4A 指定JSON (实际的功能 在body中指定)
+     */
+    JSON_BODY(0x4A, 0x0000, "调试宝JSON报文体");
+
+    private final int firstType;
+    private final int secondType;
+    private final String message;
+
+    MsgTypeEnum(int firstType, int secondType, String message)
+    {
+        this.firstType = firstType;
+        this.secondType = secondType;
+        this.message = message;
+    }
+
+    public int getFirstType()
+    {
+        return firstType;
+    }
+
+    public int getSecondType()
+    {
+        return secondType;
+    }
+
+    public String getMessage()
+    {
+        return message;
+    }
+}

+ 23 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/QosEnum.java

@@ -0,0 +1,23 @@
+package com.ruoyi.device.mqtt.enums;
+
+/**
+ * Qos 等级枚举
+ *
+ * @author lwm
+ */
+public enum QosEnum
+{
+    QoS0(0), QoS1(1), QoS2(2);
+
+    private final int value;
+
+    QosEnum(int value)
+    {
+        this.value = value;
+    }
+
+    public int getValue()
+    {
+        return value;
+    }
+}

+ 39 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/enums/VersionEnum.java

@@ -0,0 +1,39 @@
+package com.ruoyi.device.mqtt.enums;
+
+/**
+ * 版本枚举
+ *
+ * @author lwm
+ */
+public enum VersionEnum
+{
+    FIRST(0x01),
+    SECOND(0x02),
+    THIRD(0x03),
+    NINE(0x09),
+    AUTO(0xFF);
+
+    private final int value;
+
+    VersionEnum(int value)
+    {
+        this.value = value;
+    }
+
+    public int getValue()
+    {
+        return value;
+    }
+
+    public static VersionEnum getVersion(int version)
+    {
+        for (VersionEnum versionEnum : VersionEnum.values())
+        {
+            if (version == versionEnum.getValue())
+            {
+                return versionEnum;
+            }
+        }
+        return null;
+    }
+}

+ 78 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/HandlerManager.java

@@ -0,0 +1,78 @@
+package com.ruoyi.device.mqtt.handler;
+
+import com.ruoyi.device.mqtt.annotation.ConsumerHandler;
+import com.ruoyi.device.mqtt.annotation.ProducerHandler;
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
+import com.ruoyi.device.mqtt.handler.decoder.IDecoder;
+import com.ruoyi.device.mqtt.handler.encoder.IEncoder;
+import com.ruoyi.device.mqtt.util.MsgHandlerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 缓存消费、生产处理器,通过key进行缓存
+ *
+ * @author lwm
+ */
+@Component
+public class HandlerManager implements ApplicationListener<ContextRefreshedEvent>
+{
+    private static final Logger log = LoggerFactory.getLogger(HandlerManager.class);
+
+    private static final Map<String, IDecoder> DECODER_MAP = new HashMap<>();
+    private static final Map<String, IEncoder<?>> ENCODER_MAP = new HashMap<>();
+
+    /**
+     * 监听spring容器启动完成,将所有ConsumerHandler和ProducerHandler缓存起来
+     *
+     * @param event 上下文
+     */
+    @Override
+    public void onApplicationEvent(ContextRefreshedEvent event)
+    {
+        ApplicationContext applicationContext = event.getApplicationContext();
+
+        Map<String, Object> consumerHandler = applicationContext.getBeansWithAnnotation(ConsumerHandler.class);
+        for (Object bean : consumerHandler.values())
+        {
+            ConsumerHandler annotation = bean.getClass().getAnnotation(ConsumerHandler.class);
+            MsgTypeEnum msgTypeEnum = annotation.msgType();
+            DECODER_MAP.put(MsgHandlerUtil.getDecoderKey(msgTypeEnum), (IDecoder) bean);
+            log.info("加载消费处理器,key:{},handler:{}", MsgHandlerUtil.getDecoderKey(msgTypeEnum),
+                    bean.getClass().getSimpleName());
+        }
+
+        Map<String, Object> producerHandler = applicationContext.getBeansWithAnnotation(ProducerHandler.class);
+        for (Object bean : producerHandler.values())
+        {
+            ProducerHandler annotation = bean.getClass().getAnnotation(ProducerHandler.class);
+            MsgTypeEnum msgTypeEnum = annotation.msgType();
+            ENCODER_MAP.put(MsgHandlerUtil.getEncoderKey(msgTypeEnum), (IEncoder<?>) bean);
+            log.info("加载生产处理器,key:{},handler:{}", MsgHandlerUtil.getEncoderKey(msgTypeEnum),
+                    bean.getClass().getSimpleName());
+        }
+    }
+
+    /**
+     * 返回已注册解码器;未注册时为 {@code null},无 unchecked cast。
+     */
+    public IDecoder getDecoder(String key)
+    {
+        return DECODER_MAP.get(key);
+    }
+
+    /**
+     * 获取已注册编码器;未注册时为 {@code null},无 unchecked cast。
+     */
+    public IEncoder<?> getEncoder(String key)
+    {
+        return ENCODER_MAP.get(key);
+    }
+}

+ 158 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/AbstractDecoder.java

@@ -0,0 +1,158 @@
+package com.ruoyi.device.mqtt.handler.decoder;
+
+import com.alibaba.fastjson2.JSON;
+import com.ruoyi.device.mqtt.handler.HandlerManager;
+import com.ruoyi.device.mqtt.handler.encoder.IEncoder;
+import com.ruoyi.device.mqtt.util.MsgHandlerUtil;
+import com.ruoyi.device.mqtt.vo.CommonHeader;
+import com.ruoyi.device.mqtt.vo.CommonTopic;
+import io.netty.buffer.ByteBuf;
+import jakarta.annotation.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+
+/**
+ * 报文体解析能力
+ *
+ * @author lwm
+ */
+public abstract class AbstractDecoder<T> implements IDecoder
+{
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Resource
+    private HandlerManager handlerManager;
+
+    /**
+     * 解码报文
+     *
+     * @param commonTopic topic设备信息
+     * @param header 报文头
+     * @param body 报文体
+     * @return 解析结果
+     */
+    @Override
+    public boolean process(CommonTopic commonTopic, CommonHeader header, ByteBuf body)
+    {
+        T content = decode(commonTopic, header, body);
+        if (content != null)
+        {
+            log.info("TOPIC:{},DECODED-CONTENT:{}", commonTopic.getDeviceType() + "/" + commonTopic.getDeviceSn(),
+                JSON.toJSONString(content));
+            onEncoded(commonTopic, header, content);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * 解码成功后 组帧报文下发
+     */
+    @SuppressWarnings("unchecked")
+    protected void onEncoded(CommonTopic commonTopic, CommonHeader header, T content)
+    {
+        String key = MsgHandlerUtil.getEncoderKey(header.getFirstType(), header.getSecondType());
+        IEncoder<T> encoder = (IEncoder<T>) handlerManager.getEncoder(key);
+        if (encoder != null)
+        {
+            encoder.encode(content);
+        }
+    }
+
+    /**
+     * 金额转换:上报的单位是分,需要转换成元
+     *
+     * @param value 原始金额
+     * @return 处理金额
+     */
+    public BigDecimal amountChange(Long value)
+    {
+        return new BigDecimal(value).divide(new BigDecimal(100), 2, RoundingMode.HALF_UP);
+    }
+
+    /**
+     * ByteBuf 读取length长度的byte[] 转16进制
+     *
+     * @param body ByteBuf
+     * @param length 读取长度
+     * @return 16进制字符串
+     */
+    protected String readHex(ByteBuf body, int length)
+    {
+        byte[] content = new byte[length];
+        body.readBytes(content);
+        return bytesToHex(content);
+    }
+
+    private String bytesToHex(byte[] bytes)
+    {
+        try
+        {
+            StringBuilder sb = new StringBuilder();
+            for (byte b : bytes)
+            {
+                String hex = Integer.toHexString(0xFF & b);
+                if (hex.length() < 2)
+                {
+                    sb.append('0');
+                }
+                sb.append(hex);
+            }
+            return sb.toString();
+        }
+        catch (Exception e)
+        {
+            log.error("原始数据转换为16进制错误:{}", e.getMessage(), e);
+        }
+        return "";
+    }
+
+
+    /**
+     * ByteBuf 读取length长度的byte[] 转ASCII
+     *
+     * @param body ByteBuf
+     * @param length 读取长度
+     * @return ASCII字符串
+     */
+    protected String readASCII(ByteBuf body, int length)
+    {
+        byte[] content = new byte[length];
+        body.readBytes(content);
+        return validateASCII(content);
+    }
+
+    private String validateASCII(byte[] data)
+    {
+        try
+        {
+            StringBuilder stringBuilder = new StringBuilder();
+            for (byte b : data)
+            {
+                if (b >= 32 && b <= 126)
+                {
+                    stringBuilder.append((char) b);
+                }
+            }
+            return stringBuilder.toString();
+        }
+        catch (Exception e)
+        {
+            log.error("原始数据转换为ASCII错误:{}", e.getMessage(), e);
+        }
+        return "";
+    }
+
+    /**
+     * 解码报文,提供扩展点
+     *
+     * @param commonTopic topic设备信息
+     * @param header 报文头
+     * @param body 报文体
+     * @return 解析结果
+     */
+    protected abstract T decode(CommonTopic commonTopic, CommonHeader header, ByteBuf body);
+}

+ 23 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/IDecoder.java

@@ -0,0 +1,23 @@
+package com.ruoyi.device.mqtt.handler.decoder;
+
+import com.ruoyi.device.mqtt.vo.CommonHeader;
+import com.ruoyi.device.mqtt.vo.CommonTopic;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * 上行报文 解码接口
+ *
+ * @author lwm
+ */
+public interface IDecoder
+{
+    /**
+     * 解码
+     *
+     * @param commonTopic topic解析对象
+     * @param header 报文头解析对象
+     * @param body 报文体 准备解码
+     * @return 解码结果
+     */
+    boolean process(CommonTopic commonTopic, CommonHeader header, ByteBuf body);
+}

+ 191 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/MessageHandler.java

@@ -0,0 +1,191 @@
+package com.ruoyi.device.mqtt.handler.decoder;
+
+import com.ruoyi.common.utils.StringUtils;
+import com.ruoyi.device.mqtt.handler.HandlerManager;
+import com.ruoyi.device.mqtt.util.CRC16Standard;
+import com.ruoyi.device.mqtt.util.MsgHandlerUtil;
+import com.ruoyi.device.mqtt.vo.CommonHeader;
+import com.ruoyi.device.mqtt.vo.CommonTopic;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import jakarta.annotation.Resource;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+
+/**
+ * 线程池异步消费 + try/catch,避免消息处理异常影响 Paho 订阅线程。
+ *
+ * @author lwm
+ */
+@Component
+public class MessageHandler
+{
+    private static final Logger log = LoggerFactory.getLogger(MessageHandler.class);
+
+    @Resource
+    private HandlerManager handlerManager;
+
+    @Resource
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    /**
+     * 订阅消息处理
+     *
+     * @param topic topic主题
+     * @param message 消息体
+     */
+    public void handler(String topic, MqttMessage message)
+    {
+        threadPoolTaskExecutor.execute(() -> {
+            try
+            {
+                if (StringUtils.isEmpty(topic) || message == null)
+                {
+                    return;
+                }
+                // 1、解析topic
+                CommonTopic commonTopic = parseTopic(topic);
+                if (commonTopic == null)
+                {
+                    return;
+                }
+                // 2、解析 payload
+                byte[] payload = message.getPayload();
+                if (!verifyCrc16(payload))
+                {
+                    return;
+                }
+                ByteBuf byteBuf = Unpooled.wrappedBuffer(payload);
+                // 3、读取消息头
+                CommonHeader commonHeader = readHeader(byteBuf);
+                // 4、转换消息体 打印日志
+                String payloadHex = bytesToHexLog(payload);
+                String firstType = Integer.toHexString(commonHeader.getFirstType());
+                String secondType = Integer.toHexString(commonHeader.getSecondType());
+                log.info("设备SN:{},设备类型:{},一级分类:{},二级分类:{},上报的数据包:{}", commonTopic.getDeviceSn(),
+                    commonTopic.getDeviceType(), firstType, secondType, payloadHex);
+                // 5、获取消息类型以及对应的解析器
+                String key = MsgHandlerUtil.getDecoderKey(commonHeader.getFirstType(), commonHeader.getSecondType());
+                IDecoder decoder = handlerManager.getDecoder(key);
+                if (decoder == null)
+                {
+                    log.warn("未找到对应业务解析器,消息类型:{}-{}", firstType, secondType);
+                    return;
+                }
+                // 6、处理消息
+                boolean result = decoder.process(commonTopic, commonHeader, byteBuf);
+                log.info("订阅消息处理结果,topic={},messageid={},result={}", topic, message.getId(), result);
+            }
+            catch (Exception e)
+            {
+                log.info("消息处理异常,异常原因:{}", e.getMessage(), e);
+            }
+        });
+    }
+
+    /**
+     * 消息头解析
+     *
+     * @param byteBuf byteBuf
+     * @return 消息头
+     */
+    private CommonHeader readHeader(ByteBuf byteBuf)
+    {
+        CommonHeader commonHeader = new CommonHeader();
+        // 固定帧头
+        commonHeader.setFrameHeader(byteBuf.readUnsignedShortLE());
+        // 版本号
+        commonHeader.setProtoVer(byteBuf.readUnsignedByte());
+        // 消息编号
+        commonHeader.setMsgId(byteBuf.readUnsignedIntLE());
+        // 一级分类
+        commonHeader.setFirstType(byteBuf.readUnsignedByte());
+        // 二级分类
+        commonHeader.setSecondType(byteBuf.readUnsignedShortLE());
+        // 报文消息体body长度 + 2
+        commonHeader.setMsgLength(byteBuf.readUnsignedShortLE());
+        return commonHeader;
+    }
+
+    /**
+     * 与 {@link com.ruoyi.device.mqtt.handler.encoder.AbstractEncoder} 下行一致:整包除末尾 2 字节 CRC 外参与 CRC16 计算,
+     * 再与 payload 最后 2 字节比较。
+     */
+    private boolean verifyCrc16(byte[] payload)
+    {
+        if (payload == null || payload.length < 14)
+        {
+            log.warn("CRC16 校验失败: 报文过短, len={}", payload == null ? 0 : payload.length);
+            return false;
+        }
+        byte[] dataWithoutTailCrc = Arrays.copyOfRange(payload, 0, payload.length - 2);
+        byte[] computed = CRC16Standard.getCRCBytes(dataWithoutTailCrc);
+        byte low = payload[payload.length - 2];
+        byte high = payload[payload.length - 1];
+        if (computed[0] != low || computed[1] != high)
+        {
+            log.warn("CRC16 校验失败: expect [{}, {}], actual [{}, {}], hexLog={}",
+                computed[0] & 0xFF, computed[1] & 0xFF, low & 0xFF, high & 0xFF,
+                bytesToHexLog(payload));
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * 解析 topic
+     *
+     * @param topic topic
+     * @return CommonTopic
+     */
+    private CommonTopic parseTopic(String topic)
+    {
+        String[] split = topic.split("/");
+        if (split.length != 4)
+        {
+            return null;
+        }
+        String deviceType = split[2];
+        Long deviceSn = Long.parseLong(split[3]);
+        return new CommonTopic(deviceType, deviceSn);
+    }
+
+    /**
+     * byte[] 转16进制 打印日志
+     *
+     * @param bytes bytes
+     * @return 16进制字符串 打印日志
+     */
+    private String bytesToHexLog(byte[] bytes)
+    {
+        try
+        {
+            StringBuilder sb = new StringBuilder();
+            String tmp;
+            sb.append("[");
+            for (byte b : bytes)
+            {
+                tmp = Integer.toHexString(0xFF & b);
+                if (tmp.length() == 1)
+                {
+                    tmp = "0" + tmp;
+                }
+                sb.append(tmp).append(" ");
+            }
+            sb.delete(sb.length() - 1, sb.length());
+            sb.append("]");
+            return sb.toString();
+        }
+        catch (Exception e)
+        {
+            log.error("原始数据转换为16进制错误:{}", e.getMessage(), e);
+        }
+        return "";
+    }
+
+}

+ 22 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/IJsonCmdHandler.java

@@ -0,0 +1,22 @@
+package com.ruoyi.device.mqtt.handler.decoder.json;
+
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
+import com.ruoyi.device.mqtt.vo.CommonHeader;
+import com.ruoyi.device.mqtt.vo.CommonTopic;
+
+/**
+ * JSON Body 按 cmdType 分发的业务处理器
+ *
+ * @author lwm
+ */
+public interface IJsonCmdHandler
+{
+    /**
+     * @param topic     上行 topic 解析结果
+     * @param header    报文头
+     * @param bodyJson  UTF-8 JSON 原文
+     * @param cmd       已匹配的上行命令枚举
+     */
+    BaseJsonBody handle(CommonTopic topic, CommonHeader header, String bodyJson, CmdTypeEnum cmd);
+}

+ 89 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/JsonBodyDecoder.java

@@ -0,0 +1,89 @@
+package com.ruoyi.device.mqtt.handler.decoder.json;
+
+import com.alibaba.fastjson2.JSON;
+import com.ruoyi.common.utils.StringUtils;
+import com.ruoyi.device.mqtt.annotation.ConsumerHandler;
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
+import com.ruoyi.device.mqtt.handler.decoder.AbstractDecoder;
+import com.ruoyi.device.mqtt.vo.CommonHeader;
+import com.ruoyi.device.mqtt.vo.CommonTopic;
+import io.netty.buffer.ByteBuf;
+import jakarta.annotation.Resource;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 解析 firstType=0x4A 的 UTF-8 JSON Body,按 {@link CmdTypeEnum} 分发业务并控制下行。
+ *
+ * @author lwm
+ */
+@ConsumerHandler(msgType = MsgTypeEnum.JSON_BODY)
+public class JsonBodyDecoder extends AbstractDecoder<BaseJsonBody>
+{
+    @Resource
+    private JsonCmdHandlerManager jsonCmdHandlerManager;
+
+    @Override
+    protected BaseJsonBody decode(CommonTopic commonTopic, CommonHeader header, ByteBuf body)
+    {
+        // 1、按照 msgLength-2 的长度读取报文体
+        if (header.getMsgLength() <= 2)
+        {
+            log.warn("报文体长度标识小于2,忽略解析");
+            return BaseJsonBody.fail(commonTopic.getDeviceType(), commonTopic.getDeviceSn(), "报文体长度标识小于2,忽略解析");
+        }
+        byte[] bodyBytes = new byte[header.getMsgLength() - 2];
+        body.readBytes(bodyBytes);
+        String bodyJson = new String(bodyBytes, StandardCharsets.UTF_8).trim();
+
+        // 2、解析报文体json
+        if (!JSON.isValidObject(bodyJson))
+        {
+            log.warn("报文体json格式化出错");
+            return BaseJsonBody.fail(commonTopic.getDeviceType(), commonTopic.getDeviceSn(), "报文体json格式化出错");
+        }
+
+        // 3、json格式化为对象jsonBody
+        BaseJsonBody jsonBody = JSON.parseObject(bodyJson, BaseJsonBody.class);
+        if (jsonBody == null || StringUtils.isEmpty(jsonBody.getCmdType()))
+        {
+            log.warn("报文体缺少 cmdType");
+            return BaseJsonBody.fail(commonTopic.getDeviceType(), commonTopic.getDeviceSn(), "报文体缺少 cmdType");
+        }
+        String rawCmdType = jsonBody.getCmdType().trim();
+        if (CmdTypeEnum.isDownlink(rawCmdType))
+        {
+            log.warn("收到下行 cmdType 于上行通道, cmdType={}", rawCmdType);
+            return BaseJsonBody.fail(commonTopic.getDeviceType(), commonTopic.getDeviceSn(), "cmdType 不能为下行类型");
+        }
+        CmdTypeEnum cmd = CmdTypeEnum.resolveUplink(rawCmdType);
+        if (cmd == null)
+        {
+            log.warn("未知 cmdType={}", rawCmdType);
+            return BaseJsonBody.fail(commonTopic.getDeviceType(), commonTopic.getDeviceSn(), "未知 cmdType: " + rawCmdType);
+        }
+        if (StringUtils.isEmpty(jsonBody.getDeviceType()) && StringUtils.isNotEmpty(commonTopic.getDeviceType()))
+        {
+            jsonBody.setDeviceType(commonTopic.getDeviceType());
+        }
+        if (jsonBody.getDeviceSn() == null && commonTopic.getDeviceSn() != null)
+        {
+            jsonBody.setDeviceSn(commonTopic.getDeviceSn());
+        }
+
+        // 4、进行业务处理
+        IJsonCmdHandler handler = jsonCmdHandlerManager.getHandlerByUpKey(cmd.getCmdUpType());
+        if (handler == null)
+        {
+            handler = jsonCmdHandlerManager.getHandlerByBaseKey(cmd.getCmdType());
+        }
+        if (handler == null)
+        {
+            log.warn("未注册 JSON 命令处理器 cmd={}", cmd);
+            return BaseJsonBody.fail(jsonBody.getDeviceType(), jsonBody.getDeviceSn(), cmd.getCmdDownType(), "未实现的服务处理器");
+        }
+        return handler.handle(commonTopic, header, bodyJson, cmd);
+    }
+}

+ 65 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/JsonCmdHandlerManager.java

@@ -0,0 +1,65 @@
+package com.ruoyi.device.mqtt.handler.decoder.json;
+
+import com.ruoyi.device.mqtt.annotation.JsonCmdHandler;
+import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 按 {@link CmdTypeEnum} 将 JSON 上行分发给各 {@link IJsonCmdHandler}
+ *
+ * @author lwm
+ */
+@Component
+public class JsonCmdHandlerManager implements ApplicationListener<ContextRefreshedEvent>
+{
+    private static final Logger log = LoggerFactory.getLogger(JsonCmdHandlerManager.class);
+
+    private final Map<String, IJsonCmdHandler> HANDLER_BY_UP_KEY = new HashMap<>();
+    private final Map<String, IJsonCmdHandler> HANDLER_BY_BASE_KEY = new HashMap<>();
+
+    /**
+     * 监听spring容器启动完成,将所有JsonCmdHandler缓存起来
+     *
+     * @param event 上下文
+     */
+    @Override
+    public void onApplicationEvent(ContextRefreshedEvent event)
+    {
+        ApplicationContext ctx = event.getApplicationContext();
+
+        Map<String, Object> jsonCmdHandler = ctx.getBeansWithAnnotation(JsonCmdHandler.class);
+        for (Object bean : jsonCmdHandler.values())
+        {
+            JsonCmdHandler annotation = bean.getClass().getAnnotation(JsonCmdHandler.class);
+            CmdTypeEnum cmdTypeEnum = annotation.cmdType();
+            HANDLER_BY_UP_KEY.put(cmdTypeEnum.getCmdUpType(), (IJsonCmdHandler) bean);
+            HANDLER_BY_BASE_KEY.put(cmdTypeEnum.getCmdType(), (IJsonCmdHandler) bean);
+            log.info("加载 JSON 命令处理器 upKey={}, baseKey={}, handler={}",
+                    cmdTypeEnum.getCmdUpType(), cmdTypeEnum.getCmdType(), bean.getClass().getSimpleName());
+        }
+    }
+
+    /**
+     * 按 upKey 获取处理器
+     */
+    public IJsonCmdHandler getHandlerByUpKey(String upKey)
+    {
+        return HANDLER_BY_UP_KEY.get(upKey);
+    }
+
+    /**
+     * 按 baseKey 获取处理器
+     */
+    public IJsonCmdHandler getHandlerByBaseKey(String baseKey)
+    {
+        return HANDLER_BY_BASE_KEY.get(baseKey);
+    }
+}

+ 122 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/service/DeviceLoginService.java

@@ -0,0 +1,122 @@
+package com.ruoyi.device.mqtt.handler.decoder.json.service;
+
+import com.alibaba.fastjson2.JSON;
+import com.ruoyi.common.core.domain.entity.SysRole;
+import com.ruoyi.common.core.domain.entity.SysUser;
+import com.ruoyi.common.enums.UserStatus;
+import com.ruoyi.common.utils.SecurityUtils;
+import com.ruoyi.common.utils.StringUtils;
+import com.ruoyi.device.domain.model.TsbUserDeviceBind;
+import com.ruoyi.device.mqtt.annotation.JsonCmdHandler;
+import com.ruoyi.device.mqtt.constants.MqttConstants;
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+import com.ruoyi.device.mqtt.domain.decoder.DeviceLoginRequest;
+import com.ruoyi.device.mqtt.domain.encoder.DeviceLoginResponse;
+import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
+import com.ruoyi.device.mqtt.handler.decoder.json.IJsonCmdHandler;
+import com.ruoyi.device.mqtt.vo.CommonHeader;
+import com.ruoyi.device.mqtt.vo.CommonTopic;
+import com.ruoyi.device.service.ITsbUserDeviceService;
+import com.ruoyi.framework.web.service.SysPermissionService;
+import com.ruoyi.system.service.ISysUserService;
+import jakarta.annotation.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * 调试宝登录:tsb:login:up → tsb:login:down
+ *
+ * @author lwm
+ */
+@JsonCmdHandler(cmdType = CmdTypeEnum.TSB_LOGIN)
+public class DeviceLoginService implements IJsonCmdHandler
+{
+    private static final Logger log = LoggerFactory.getLogger(DeviceLoginService.class);
+
+    @Resource
+    private ITsbUserDeviceService tsbUserDeviceService;
+    @Resource
+    private ISysUserService userService;
+    @Resource
+    private SysPermissionService permissionService;
+
+    @Override
+    public BaseJsonBody handle(CommonTopic topic, CommonHeader header, String bodyJson, CmdTypeEnum cmd)
+    {
+        DeviceLoginRequest request = JSON.parseObject(bodyJson, DeviceLoginRequest.class);
+        if (request == null)
+        {
+            log.warn("调试宝登录报文体解析失败");
+            return BaseJsonBody.fail(topic.getDeviceType(), topic.getDeviceSn(), cmd.getCmdDownType(), "调试宝登录报文体解析失败");
+        }
+
+        // 1、用户信息校验
+        String userName = request.getUserName();
+        if (StringUtils.isEmpty(userName))
+        {
+            log.warn("用户名不能为空");
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "用户名不能为空");
+        }
+        SysUser user = userService.selectUserByUserName(userName);
+        if (StringUtils.isNull(user))
+        {
+            log.warn("登录用户:{}不存在", userName);
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "当前登录用户不存在");
+        }
+        if (UserStatus.DELETED.getCode().equals(user.getDelFlag()))
+        {
+            log.warn("登录用户:{}已被删除", userName);
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "当前登录用户已被删除");
+        }
+        if (UserStatus.DISABLE.getCode().equals(user.getStatus()))
+        {
+            log.warn("登录用户:{}已被停用", userName);
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "当前登录用户已被停用");
+        }
+        if (StringUtils.isEmpty(request.getUserPassword())
+            || !SecurityUtils.matchesPassword(request.getUserPassword(), user.getPassword()))
+        {
+            log.warn("登录用户:{} 密码错误,请重新输入.", userName);
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "密码错误,请重新输入");
+        }
+
+        // 2、用户设备绑定关系
+        TsbUserDeviceBind bindQuery = new TsbUserDeviceBind();
+        bindQuery.setUserId(user.getUserId());
+        bindQuery.setImei(request.getImei());
+        TsbUserDeviceBind bindResult = tsbUserDeviceService.selectTsbUserDeviceBind(bindQuery);
+        if (bindResult == null)
+        {
+            log.warn("用户:{} 未绑定设备.", userName);
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "当前用户未绑定设备");
+        }
+
+        // 3、返回登录结果
+        DeviceLoginResponse response = new DeviceLoginResponse();
+        response.setCmdType(cmd.getCmdDownType());
+        response.setDeviceType(request.getDeviceType());
+        response.setDeviceSn(request.getDeviceSn());
+        response.setCode(MqttConstants.SUCCESS_CODE);
+        response.setMsg("登录成功");
+        response.setUserName(user.getUserName());
+        response.setPhoneNumber(user.getPhonenumber());
+        response.setNickName(user.getNickName());
+        List<SysRole> roles = user.getRoles();
+        if (StringUtils.isNotEmpty(roles))
+        {
+            String roleNames = roles.stream()
+                    .map(SysRole::getRoleName)
+                    .filter(StringUtils::isNotEmpty)
+                    .collect(Collectors.joining(","));
+            response.setRoleName(roleNames);
+        }
+        Set<String> permissions = permissionService.getMenuPermission(user);
+        response.setPermissions(permissions);
+        return response;
+    }
+
+}

+ 88 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/decoder/json/service/DevicePtService.java

@@ -0,0 +1,88 @@
+package com.ruoyi.device.mqtt.handler.decoder.json.service;
+
+import com.alibaba.fastjson2.JSON;
+import com.ruoyi.common.enums.UserStatus;
+import com.ruoyi.common.utils.DateUtils;
+import com.ruoyi.common.utils.StringUtils;
+import com.ruoyi.device.domain.entity.TsbDevice;
+import com.ruoyi.device.mapper.TsbDeviceMapper;
+import com.ruoyi.device.mqtt.annotation.JsonCmdHandler;
+import com.ruoyi.device.mqtt.constants.MqttConstants;
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+import com.ruoyi.device.mqtt.domain.decoder.DevicePtRequest;
+import com.ruoyi.device.mqtt.domain.encoder.DevicePtResponse;
+import com.ruoyi.device.mqtt.enums.CmdTypeEnum;
+import com.ruoyi.device.mqtt.handler.decoder.json.IJsonCmdHandler;
+import com.ruoyi.device.mqtt.vo.CommonHeader;
+import com.ruoyi.device.mqtt.vo.CommonTopic;
+import jakarta.annotation.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 调试宝产测:tsb:pt:up → tsb:pt:down
+ *
+ * @author lwm
+ */
+@JsonCmdHandler(cmdType = CmdTypeEnum.TSB_PT)
+public class DevicePtService implements IJsonCmdHandler
+{
+    private static final Logger log = LoggerFactory.getLogger(DevicePtService.class);
+
+    @Resource
+    private TsbDeviceMapper tsbDeviceMapper;
+
+    @Override
+    public BaseJsonBody handle(CommonTopic topic, CommonHeader header, String bodyJson, CmdTypeEnum cmd)
+    {
+        DevicePtRequest request = JSON.parseObject(bodyJson, DevicePtRequest.class);
+        if (request == null)
+        {
+            log.warn("调试宝产测报文体解析失败");
+            return BaseJsonBody.fail(topic.getDeviceType(), topic.getDeviceSn(), cmd.getCmdDownType(), "调试宝产测报文体解析失败");
+        }
+
+        // 1、设备查询
+        if (StringUtils.isEmpty(request.getImei()))
+        {
+            log.warn("IMEI 不能为空");
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "IMEI 不能为空");
+        }
+        TsbDevice device = tsbDeviceMapper.selectTsbDeviceByImei(request.getImei());
+        if (device == null)
+        {
+            log.warn("未找到 IMEI:{} 对应设备", request.getImei());
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "未找到 IMEI 对应设备");
+        }
+        if (UserStatus.DELETED.getCode().equals(device.getDelFlag()))
+        {
+            log.warn("设备 IMEI:{} 已被删除", request.getImei());
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "设备 IMEI 对应设备已被删除");
+        }
+        if (UserStatus.DISABLE.getCode().equals(device.getStatus()))
+        {
+            log.warn("设备 IMEI:{} 已被停用", request.getImei());
+            return BaseJsonBody.fail(request.getDeviceType(), request.getDeviceSn(), cmd.getCmdDownType(), "设备 IMEI 对应设备已被停用");
+        }
+
+        // 2、返回结果
+        DevicePtResponse response = new DevicePtResponse();
+        response.setCmdType(cmd.getCmdDownType());
+        response.setImei(request.getImei());
+        DevicePtResponse.DeviceConfig config = new DevicePtResponse.DeviceConfig();
+        config.setDeviceType(device.getDeviceType());
+        config.setDeviceSn(device.getDeviceSn());
+        if (device.getDeviceProduceDate() != null)
+        {
+            config.setDeviceProduceDate(DateUtils.parseDateToStr(DateUtils.YYYYMMDD, device.getDeviceProduceDate()));
+        }
+        response.setDeviceConfig(config);
+        response.setCode(MqttConstants.SUCCESS_CODE);
+        response.setMsg("操作成功");
+        // 模拟下发topic所需的数据
+        response.setDeviceType(device.getDeviceType());
+        response.setDeviceSn(Long.parseLong(request.getImei()));
+        return response;
+    }
+
+}

+ 179 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/encoder/AbstractEncoder.java

@@ -0,0 +1,179 @@
+package com.ruoyi.device.mqtt.handler.encoder;
+
+import com.ruoyi.device.mqtt.client.EmqClient;
+import com.ruoyi.device.mqtt.constants.MqttConstants;
+import com.ruoyi.device.mqtt.domain.BaseBody;
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
+import com.ruoyi.device.mqtt.enums.QosEnum;
+import com.ruoyi.device.mqtt.enums.VersionEnum;
+import com.ruoyi.device.mqtt.util.CRC16Standard;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * 报文组帧能力
+ *
+ * @author lwm
+ */
+public abstract class AbstractEncoder<T> implements IEncoder<T>
+{
+    private static final Logger log = LoggerFactory.getLogger(AbstractEncoder.class);
+
+    @Autowired
+    private EmqClient emqClient;
+
+    /**
+     * 组帧报文
+     *
+     * @param t java对象
+     */
+    @Override
+    public void encode(T t)
+    {
+        ByteBuf header = Unpooled.buffer(800);
+        header.writeShortLE(MqttConstants.FRAME_HEADER);
+        header.writeByte(VersionEnum.THIRD.getValue());
+        header.writeIntLE(1);
+        header.writeByte(msgType().getFirstType());
+        header.writeShortLE(msgType().getSecondType());
+
+        ByteBuf body = Unpooled.buffer(800);
+        encode(t, body);
+
+        int length = body.readableBytes();
+        byte[] msgBytes = new byte[length];
+        body.getBytes(body.readerIndex(), msgBytes);
+
+        header.writeShortLE(msgBytes.length + 2);
+        header.writeBytes(msgBytes);
+
+        int totalLength = header.readableBytes();
+        byte[] totalBytes = new byte[totalLength];
+        header.getBytes(header.readerIndex(), totalBytes);
+
+        byte[] crcBytes = CRC16Standard.getCRCBytes(totalBytes);
+        byte[] bytes = ArrayUtils.addAll(totalBytes, crcBytes);
+
+        log.info("发送消息:{}", bytesToHexLog(bytes));
+        emqClient.publish(topic(t), bytes, qos(), retain());
+    }
+
+    /**
+     * byte[] 转16进制 打印日志
+     *
+     * @param bytes bytes
+     * @return 16进制字符串 打印日志
+     */
+    public static String bytesToHexLog(byte[] bytes)
+    {
+        try
+        {
+            StringBuilder sb = new StringBuilder();
+            String tmp;
+            sb.append("[");
+            for (byte b : bytes)
+            {
+                tmp = Integer.toHexString(0xFF & b);
+                if (tmp.length() == 1)
+                {
+                    tmp = "0" + tmp;
+                }
+                sb.append(tmp).append(" ");
+            }
+            sb.delete(sb.length() - 1, sb.length());
+            sb.append("]");
+            return sb.toString();
+        }
+        catch (Exception e)
+        {
+            log.error("原始数据转换为16进制错误:{}", e.getMessage(), e);
+        }
+        return "";
+    }
+
+    /**
+     * 组帧报文
+     *
+     * @param t java对象
+     */
+    protected abstract void encode(T t, ByteBuf body);
+
+    /**
+     * 获取下发的topic
+     *
+     * @param t java对象
+     * @return topic
+     */
+    protected abstract String topic(T t);
+
+    /**
+     * 获取报文类型
+     *
+     * @return 报文类型
+     */
+    protected abstract MsgTypeEnum msgType();
+
+    /**
+     * 获取报文Qos
+     *
+     * @return Qos
+     */
+    protected abstract QosEnum qos();
+
+    /**
+     * 获取报文是否保留
+     *
+     * @return 是否保留
+     */
+    protected abstract boolean retain();
+
+    /**
+     * 设置目标设备类型
+     *
+     * @param deviceType    设备型号
+     * @param body          报文体
+     */
+    protected void setTargetType(String deviceType, ByteBuf body)
+    {
+        String q = StringUtils.substring(deviceType, 0, 2);
+        String h = StringUtils.substring(deviceType, 2, 4);
+        byte[] bytes = hexStringToByte(h + q);
+        body.writeBytes(bytes);
+    }
+
+    private byte[] hexStringToByte(String hex)
+    {
+        int len = (hex.length() / 2);
+        byte[] result = new byte[len];
+        char[] achar = hex.toCharArray();
+        for (int i = 0; i < len; i++)
+        {
+            int pos = i * 2;
+            result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
+        }
+        return result;
+    }
+
+    private int toByte(char c)
+    {
+        byte b = (byte) "0123456789ABCDEF".indexOf(c);
+        return b;
+    }
+
+    /**
+     * 生成topic
+     *
+     * @param baseBody java对象
+     * @return topic
+     */
+    protected String generateTopic(BaseBody baseBody)
+    {
+        String deviceSnFormat = String.format("%010d", baseBody.getDeviceSn());
+        return String.format(MqttConstants.DEVICE_DOWN_TOPIC, baseBody.getDeviceType(), deviceSnFormat);
+    }
+}

+ 17 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/encoder/IEncoder.java

@@ -0,0 +1,17 @@
+package com.ruoyi.device.mqtt.handler.encoder;
+
+/**
+ * 下行报文 编码接口
+ *
+ * @author lwm
+ **/
+public interface IEncoder<T>
+{
+    /**
+     * 编码
+     *
+     * @param t 准备编码的对象
+     */
+    void encode(T t);
+
+}

+ 48 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/handler/encoder/json/JsonBodyEncoder.java

@@ -0,0 +1,48 @@
+package com.ruoyi.device.mqtt.handler.encoder.json;
+
+import com.alibaba.fastjson2.JSON;
+import com.ruoyi.device.mqtt.annotation.ProducerHandler;
+import com.ruoyi.device.mqtt.domain.BaseJsonBody;
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
+import com.ruoyi.device.mqtt.enums.QosEnum;
+import com.ruoyi.device.mqtt.handler.encoder.AbstractEncoder;
+import io.netty.buffer.ByteBuf;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 登录应答:JSON body + AbstractEncoder 组帧、CRC、经 {@link com.ruoyi.device.mqtt.client.EmqClient} 下发。
+ */
+@ProducerHandler(msgType = MsgTypeEnum.JSON_BODY)
+public class JsonBodyEncoder extends AbstractEncoder<BaseJsonBody>
+{
+    @Override
+    protected void encode(BaseJsonBody t, ByteBuf body)
+    {
+        body.writeBytes(JSON.toJSONString(t).getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    protected String topic(BaseJsonBody t)
+    {
+        return generateTopic(t);
+    }
+
+    @Override
+    protected MsgTypeEnum msgType()
+    {
+        return MsgTypeEnum.JSON_BODY;
+    }
+
+    @Override
+    protected QosEnum qos()
+    {
+        return QosEnum.QoS2;
+    }
+
+    @Override
+    protected boolean retain()
+    {
+        return false;
+    }
+}

+ 25 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/listener/DeviceMessageListener.java

@@ -0,0 +1,25 @@
+package com.ruoyi.device.mqtt.listener;
+
+import com.ruoyi.device.mqtt.handler.decoder.MessageHandler;
+import jakarta.annotation.Resource;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+/**
+ * mqtt 设备上行消息处理监听
+ *
+ * @author lwm
+ */
+@Component
+public class DeviceMessageListener implements IMqttMessageListener
+{
+    @Resource
+    private MessageHandler messageHandler;
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception
+    {
+        messageHandler.handler(topic, message);
+    }
+}

+ 25 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/listener/InventMessageListener.java

@@ -0,0 +1,25 @@
+package com.ruoyi.device.mqtt.listener;
+
+import com.ruoyi.device.mqtt.handler.decoder.MessageHandler;
+import jakarta.annotation.Resource;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+/**
+ * mqtt invent上行消息处理监听
+ *
+ * @author lwm
+ */
+@Component
+public class InventMessageListener implements IMqttMessageListener
+{
+    @Resource
+    private MessageHandler messageHandler;
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception
+    {
+        messageHandler.handler(topic, message);
+    }
+}

+ 47 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/util/CRC16Standard.java

@@ -0,0 +1,47 @@
+package com.ruoyi.device.mqtt.util;
+
+/**
+ * CRC16校验
+ *
+ * @author lwm
+ **/
+public class CRC16Standard
+{
+    private static final int[] table = { 0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241, 0xC601, 0x06C0, 0x0780, 0xC741,
+            0x0500, 0xC5C1, 0xC481, 0x0440, 0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40, 0x0A00, 0xCAC1, 0xCB81, 0x0B40,
+            0xC901, 0x09C0, 0x0880, 0xC841, 0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40, 0x1E00, 0xDEC1, 0xDF81, 0x1F40,
+            0xDD01, 0x1DC0, 0x1C80, 0xDC41, 0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641, 0xD201, 0x12C0, 0x1380, 0xD341,
+            0x1100, 0xD1C1, 0xD081, 0x1040, 0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240, 0x3600, 0xF6C1, 0xF781, 0x3740,
+            0xF501, 0x35C0, 0x3480, 0xF441, 0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41, 0xFA01, 0x3AC0, 0x3B80, 0xFB41,
+            0x3900, 0xF9C1, 0xF881, 0x3840, 0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41, 0xEE01, 0x2EC0, 0x2F80, 0xEF41,
+            0x2D00, 0xEDC1, 0xEC81, 0x2C40, 0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640, 0x2200, 0xE2C1, 0xE381, 0x2340,
+            0xE101, 0x21C0, 0x2080, 0xE041, 0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240, 0x6600, 0xA6C1, 0xA781, 0x6740,
+            0xA501, 0x65C0, 0x6480, 0xA441, 0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41, 0xAA01, 0x6AC0, 0x6B80, 0xAB41,
+            0x6900, 0xA9C1, 0xA881, 0x6840, 0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41, 0xBE01, 0x7EC0, 0x7F80, 0xBF41,
+            0x7D00, 0xBDC1, 0xBC81, 0x7C40, 0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640, 0x7200, 0xB2C1, 0xB381, 0x7340,
+            0xB101, 0x71C0, 0x7080, 0xB041, 0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241, 0x9601, 0x56C0, 0x5780, 0x9741,
+            0x5500, 0x95C1, 0x9481, 0x5440, 0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40, 0x5A00, 0x9AC1, 0x9B81, 0x5B40,
+            0x9901, 0x59C0, 0x5880, 0x9841, 0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40, 0x4E00, 0x8EC1, 0x8F81, 0x4F40,
+            0x8D01, 0x4DC0, 0x4C80, 0x8C41, 0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, 0x8201, 0x42C0, 0x4380, 0x8341,
+            0x4100, 0x81C1, 0x8081, 0x4040, };
+
+    public static byte[] getCRCBytes(byte[] data)
+    {
+        int crc = 0x0000;
+        for (byte b : data)
+        {
+            crc = (crc >>> 8) ^ table[(crc ^ b) & 0xff];
+        }
+        return new byte[]{(byte) (0xff & crc), (byte) ((0xff00 & crc) >> 8)};
+    }
+
+    public static byte[] getPCRCBytes(byte[] data)
+    {
+        int crc = 0x0000;
+        for (byte b : data)
+        {
+            crc = (crc >>> 8) ^ table[(crc ^ b) & 0xff];
+        }
+        return new byte[]{(byte) ((0xff00 & crc) >> 8), (byte) (0xff & crc)};
+    }
+}

+ 50 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/util/MsgHandlerUtil.java

@@ -0,0 +1,50 @@
+package com.ruoyi.device.mqtt.util;
+
+import com.ruoyi.device.mqtt.enums.MsgTypeEnum;
+
+/**
+ * MsgHandlerUtil 消息类型处理工具类
+ * 将消息的一级类型和二级类型拼接成key
+ *
+ * @author lwm
+ */
+public final class MsgHandlerUtil
+{
+    private static final String UNDERLINE = "_";
+
+    private MsgHandlerUtil()
+    {
+    }
+
+    /**
+     * 获取消息处理key
+     *
+     * @param msgTypeEnum 消息类型枚举
+     * @return 消息处理key
+     */
+    public static String getDecoderKey(MsgTypeEnum msgTypeEnum)
+    {
+        return getDecoderKey(msgTypeEnum.getFirstType(), msgTypeEnum.getSecondType());
+    }
+
+    public static String getDecoderKey(Integer firstKey, Integer secondKey)
+    {
+        return Integer.toHexString(firstKey) + UNDERLINE + Integer.toHexString(secondKey);
+    }
+
+    /**
+     * 获取消息处理key
+     *
+     * @param msgTypeEnum 消息类型枚举
+     * @return 消息处理key
+     */
+    public static String getEncoderKey(MsgTypeEnum msgTypeEnum)
+    {
+        return getEncoderKey(msgTypeEnum.getFirstType(), msgTypeEnum.getSecondType());
+    }
+
+    public static String getEncoderKey(Integer firstKey, Integer secondKey)
+    {
+        return Integer.toHexString(firstKey) + UNDERLINE + Integer.toHexString(secondKey);
+    }
+}

+ 99 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/vo/CommonHeader.java

@@ -0,0 +1,99 @@
+package com.ruoyi.device.mqtt.vo;
+
+/**
+ * 报文的固定格式解析对象
+ *
+ * @author lwm
+ */
+public class CommonHeader
+{
+    /**
+     * 报文头 固定帧头
+     */
+    private int frameHeader;
+
+    /**
+     * 协议版本
+     */
+    private int protoVer;
+
+    /**
+     * 消息编号
+     */
+    private Long msgId;
+
+    /**
+     * 报文一级类型
+     */
+    private int firstType;
+
+    /**
+     * 报文二级类型
+     */
+    private int secondType;
+
+    /**
+     * 报文消息体body长度 + 2
+     */
+    private int msgLength;
+
+    public int getFrameHeader()
+    {
+        return frameHeader;
+    }
+
+    public void setFrameHeader(int frameHeader)
+    {
+        this.frameHeader = frameHeader;
+    }
+
+    public int getProtoVer()
+    {
+        return protoVer;
+    }
+
+    public void setProtoVer(int protoVer)
+    {
+        this.protoVer = protoVer;
+    }
+
+    public Long getMsgId()
+    {
+        return msgId;
+    }
+
+    public void setMsgId(Long msgId)
+    {
+        this.msgId = msgId;
+    }
+
+    public int getFirstType()
+    {
+        return firstType;
+    }
+
+    public void setFirstType(int firstType)
+    {
+        this.firstType = firstType;
+    }
+
+    public int getSecondType()
+    {
+        return secondType;
+    }
+
+    public void setSecondType(int secondType)
+    {
+        this.secondType = secondType;
+    }
+
+    public int getMsgLength()
+    {
+        return msgLength;
+    }
+
+    public void setMsgLength(int msgLength)
+    {
+        this.msgLength = msgLength;
+    }
+}

+ 42 - 0
ruoyi-device/src/main/java/com/ruoyi/device/mqtt/vo/CommonTopic.java

@@ -0,0 +1,42 @@
+package com.ruoyi.device.mqtt.vo;
+
+/**
+ * topic主题的固定格式解析对象 cpyypt/up/{deviceType}/{deviceSn}
+ * (deviceType设备类型 + deviceSn网关/SN)
+ *
+ * @author lwm
+ */
+public class CommonTopic
+{
+    /**
+     * 设备类型
+     */
+    private String deviceType;
+
+    /**
+     * 网关/SN
+     */
+    private Long deviceSn;
+
+    public CommonTopic(String deviceType, Long deviceSn)
+    {
+        this.deviceType = deviceType;
+        this.deviceSn = deviceSn;
+    }
+
+    public Long getDeviceSn() {
+        return deviceSn;
+    }
+
+    public void setDeviceSn(Long deviceSn) {
+        this.deviceSn = deviceSn;
+    }
+
+    public String getDeviceType() {
+        return deviceType;
+    }
+
+    public void setDeviceType(String deviceType) {
+        this.deviceType = deviceType;
+    }
+}