Browse Source

mqtt连接

guoyanchao@365wy.top 1 year ago
parent
commit
481995d867

+ 35 - 0
src/main/java/com/dingding/mid/service/MqttClientConnectListener.java

@@ -0,0 +1,35 @@
+package com.dingding.mid.service;
+
+import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
+import net.dreamlu.iot.mqtt.spring.client.event.MqttConnectedEvent;
+import net.dreamlu.iot.mqtt.spring.client.event.MqttDisconnectEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Service;
+
+@Service
+public class MqttClientConnectListener {
+    private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
+
+    @Autowired
+    private MqttClientCreator mqttClientCreator;
+
+    @EventListener
+    public void onConnected(MqttConnectedEvent event) {
+        logger.info("mqttClientCreator:{}",mqttClientCreator.getIp());
+        logger.info("MqttConnectedEvent:{}", event);
+    }
+
+    @EventListener
+    public void onDisconnect(MqttDisconnectEvent event) {
+        // 离线时更新重连时的密码,适用于类似阿里云 mqtt clientId 连接带时间戳的方式
+        logger.info("MqttDisconnectEvent:{}", event);
+        // 在断线时更新 clientId、username、password
+        mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
+                .username("newUserName")
+                .password("newPassword");
+    }
+
+}

+ 22 - 0
src/main/java/com/dingding/mid/service/MqttClientCustomizerConfiguration.java

@@ -0,0 +1,22 @@
+package com.dingding.mid.service;
+
+import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
+import net.dreamlu.iot.mqtt.spring.client.MqttClientCustomizer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration(proxyBeanMethods = false)
+public class MqttClientCustomizerConfiguration {
+
+    @Bean
+    public MqttClientCustomizer mqttClientCustomizer() {
+        return new MqttClientCustomizer() {
+            @Override
+            public void customize(MqttClientCreator creator) {
+                // 此处可自定义配置 creator,会覆盖 yml 中的配置
+//                creator.ip(mqttProperties.getInnerhost());
+                System.out.println("----------------MqttServerCustomizer-----------------");
+            }
+        };
+    }
+}

+ 60 - 0
src/main/java/com/dingding/mid/service/MqttClientSubscribeListener.java

@@ -0,0 +1,60 @@
+package com.dingding.mid.service;
+
+import cn.hutool.json.JSONUtil;
+import com.dingding.mid.mqtt.MqttMessage;
+import lombok.AllArgsConstructor;
+import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.nio.charset.StandardCharsets;
+
+@Service
+@AllArgsConstructor
+public class MqttClientSubscribeListener {
+    private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);
+
+
+    @MqttClientSubscribe("/message/#")
+    public void subQos0(String topic, byte[] payload) {
+        String content = new String(payload, StandardCharsets.UTF_8);
+        logger.info(" queue  topic:{} payload:{}", topic, content);
+        if (JSONUtil.isJson(content)) {
+            MqttMessage message = JSONUtil.toBean(content,MqttMessage.class);
+            switch (message.getCommand()) {
+                case "DISCONNECT":
+                    // 退出房间
+                    // 退出队列
+//                    indexCustomerServiceService.disconnect(message);
+                    break;
+                case "SEND_MESSAGE":
+                    // 保存消息
+//                    indexCustomerServiceService.saveMessage(message);
+                    break;
+            }
+        };
+    }
+
+//    @MqttClientSubscribe(value = "$queue//accident/#", qos = MqttQoS.AT_LEAST_ONCE)
+//    public void subQos1(String topic, byte[] payload) {
+//
+//
+//    }
+
+//    @MqttClientSubscribe(value = "$share/group11//accident/#", qos = MqttQoS.EXACTLY_ONCE)
+//    public void subShare(String topic, byte[] payload) {
+//        logger.info(" subShare  topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
+//        // TODO: 2023/6/29   根据不同的命令转不同的处理
+//
+//
+//    }
+
+//    @MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
+//    public void thingSubRegister(String topic, byte[] payload) {
+//        // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
+//        // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
+//        logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
+//    }
+
+}