package com.ruoyi.device.mqtt.client; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.device.config.MessageRouter; import com.ruoyi.device.config.MqttConfig; import jakarta.annotation.Resource; import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; /** * 连接完成补订阅、断线重连 * 接收消息、发送消息 * * @author lwm */ @Component public class MessageCallback implements MqttCallbackExtended { private static final Logger log = LoggerFactory.getLogger(MessageCallback.class); @Resource @Lazy EmqClient emqClient; @Resource MqttConfig mqttConfig; /** * 连接完成,订阅所有主题 */ @Override public void connectComplete(boolean reconnect, String serverUri) { emqClient.subscribeAll(); } /** * 丢失了对服务端的连接后触发的回调 */ @Override public void connectionLost(Throwable cause) { emqClient.reConnect(); } /** * 应用收到消息后触发的回调 * 之前在 EmqClient 已经使用 subscribe(topic, qos, IMqttMessageListener) 时由监听器消费 * * 现在再来一遍 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { if (StringUtils.isEmpty(mqttConfig.getMessageRouters())) { return; } for (MessageRouter router : mqttConfig.getMessageRouters()) { String topicFilter = router.getTopic(); if(isMatched(topicFilter, topic)) { IMqttMessageListener listener = SpringUtils.getBean(router.getListener()); listener.messageArrived(topic, message); } } } /** * 判断主题是否匹配 * */ private boolean isMatched(String topicFilter, String topic) { if (topicFilter.startsWith("$queue/")) { topicFilter = topicFilter.replaceFirst("\\$queue/", ""); } else if (topicFilter.startsWith("$share/")) { topicFilter = topicFilter.replaceFirst("\\$share/", ""); topicFilter = topicFilter.substring(topicFilter.indexOf('/') + 1); } return MqttTopic.isMatched(topicFilter, topic); } /** * 消息发布者消息发布完成产生的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { if (token == null) { return; } int messageId = token.getMessageId(); String[] topics = token.getTopics(); log.info("消息发布完成,messageid={},topics={}", messageId, topics); } }