MessageCallback.java 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package com.ruoyi.device.mqtt.client;
  2. import com.ruoyi.common.utils.StringUtils;
  3. import com.ruoyi.common.utils.spring.SpringUtils;
  4. import com.ruoyi.device.config.MessageRouter;
  5. import com.ruoyi.device.config.MqttConfig;
  6. import jakarta.annotation.Resource;
  7. import org.eclipse.paho.client.mqttv3.*;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.context.annotation.Lazy;
  11. import org.springframework.stereotype.Component;
  12. /**
  13. * 连接完成补订阅、断线重连
  14. * 接收消息、发送消息
  15. *
  16. * @author lwm
  17. */
  18. @Component
  19. public class MessageCallback implements MqttCallbackExtended
  20. {
  21. private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);
  22. @Resource
  23. @Lazy
  24. EmqClient emqClient;
  25. @Resource
  26. MqttConfig mqttConfig;
  27. /**
  28. * 连接完成,订阅所有主题
  29. */
  30. @Override
  31. public void connectComplete(boolean reconnect, String serverUri)
  32. {
  33. emqClient.subscribeAll();
  34. }
  35. /**
  36. * 丢失了对服务端的连接后触发的回调
  37. */
  38. @Override
  39. public void connectionLost(Throwable cause)
  40. {
  41. emqClient.reConnect();
  42. }
  43. /**
  44. * 应用收到消息后触发的回调
  45. * 之前在 EmqClient 已经使用 subscribe(topic, qos, IMqttMessageListener) 时由监听器消费
  46. *
  47. * 现在再来一遍
  48. */
  49. @Override
  50. public void messageArrived(String topic, MqttMessage message) throws Exception
  51. {
  52. if (StringUtils.isEmpty(mqttConfig.getMessageRouters()))
  53. {
  54. return;
  55. }
  56. for (MessageRouter router : mqttConfig.getMessageRouters())
  57. {
  58. String topicFilter = router.getTopic();
  59. if(isMatched(topicFilter, topic))
  60. {
  61. IMqttMessageListener listener = SpringUtils.getBean(router.getListener());
  62. listener.messageArrived(topic, message);
  63. }
  64. }
  65. }
  66. /**
  67. * 判断主题是否匹配
  68. *
  69. */
  70. private boolean isMatched(String topicFilter, String topic) {
  71. if (topicFilter.startsWith("$queue/"))
  72. {
  73. topicFilter = topicFilter.replaceFirst("\\$queue/", "");
  74. }
  75. else if (topicFilter.startsWith("$share/"))
  76. {
  77. topicFilter = topicFilter.replaceFirst("\\$share/", "");
  78. topicFilter = topicFilter.substring(topicFilter.indexOf('/') + 1);
  79. }
  80. return MqttTopic.isMatched(topicFilter, topic);
  81. }
  82. /**
  83. * 消息发布者消息发布完成产生的回调
  84. */
  85. @Override
  86. public void deliveryComplete(IMqttDeliveryToken token)
  87. {
  88. if (token == null)
  89. {
  90. return;
  91. }
  92. int messageId = token.getMessageId();
  93. String[] topics = token.getTopics();
  94. log.info("消息发布完成,messageid={},topics={}", messageId, topics);
  95. }
  96. }