| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package com.ruoyi.device.mqtt.client;
- import com.ruoyi.common.utils.StringUtils;
- import com.ruoyi.common.utils.spring.SpringUtils;
- import com.ruoyi.device.config.MessageRouter;
- import com.ruoyi.device.config.MqttConfig;
- import jakarta.annotation.Resource;
- import org.eclipse.paho.client.mqttv3.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.annotation.Lazy;
- import org.springframework.stereotype.Component;
- /**
- * 连接完成补订阅、断线重连
- * 接收消息、发送消息
- *
- * @author lwm
- */
- @Component
- public class MessageCallback implements MqttCallbackExtended
- {
- private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);
- @Resource
- @Lazy
- EmqClient emqClient;
- @Resource
- MqttConfig mqttConfig;
- /**
- * 连接完成,订阅所有主题
- */
- @Override
- public void connectComplete(boolean reconnect, String serverUri)
- {
- emqClient.subscribeAll();
- }
- /**
- * 丢失了对服务端的连接后触发的回调
- */
- @Override
- public void connectionLost(Throwable cause)
- {
- emqClient.reConnect();
- }
- /**
- * 应用收到消息后触发的回调
- * 之前在 EmqClient 已经使用 subscribe(topic, qos, IMqttMessageListener) 时由监听器消费
- *
- * 现在再来一遍
- */
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception
- {
- if (StringUtils.isEmpty(mqttConfig.getMessageRouters()))
- {
- return;
- }
- for (MessageRouter router : mqttConfig.getMessageRouters())
- {
- String topicFilter = router.getTopic();
- if(isMatched(topicFilter, topic))
- {
- IMqttMessageListener listener = SpringUtils.getBean(router.getListener());
- listener.messageArrived(topic, message);
- }
- }
- }
- /**
- * 判断主题是否匹配
- *
- */
- private boolean isMatched(String topicFilter, String topic) {
- if (topicFilter.startsWith("$queue/"))
- {
- topicFilter = topicFilter.replaceFirst("\\$queue/", "");
- }
- else if (topicFilter.startsWith("$share/"))
- {
- topicFilter = topicFilter.replaceFirst("\\$share/", "");
- topicFilter = topicFilter.substring(topicFilter.indexOf('/') + 1);
- }
- return MqttTopic.isMatched(topicFilter, topic);
- }
- /**
- * 消息发布者消息发布完成产生的回调
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken token)
- {
- if (token == null)
- {
- return;
- }
- int messageId = token.getMessageId();
- String[] topics = token.getTopics();
- log.info("消息发布完成,messageid={},topics={}", messageId, topics);
- }
- }
|