liuyongxin 1 месяц назад
Родитель
Сommit
46dfbb4b10
1 измененных файлов с 27 добавлено и 21 удалено
  1. 27 21
      src/main/java/com/lutao/uav/mqtt/MqttConfig.java

+ 27 - 21
src/main/java/com/lutao/uav/mqtt/MqttConfig.java

@@ -175,6 +175,12 @@ public class MqttConfig {
                                         kafkaSendMessage.put("equipCode", gasMessage.getPointCode());
                                         kafkaSendMessage.put("surveyInstrumentCode", gasMessage.getPointCode2());
                                         kafkaSendMessage.put("monitorUnit", gasMessage.getUnit());
+                                        kafkaSendMessage.put("latitude", latitude);
+                                        kafkaSendMessage.put("longitude", longitude);
+                                        kafkaSendMessage.put("altitude", jsonObject.get("altitude"));
+                                        kafkaSendMessage.put("temperature", jsonObject.get("temperature"));
+                                        kafkaSendMessage.put("humidity", jsonObject.get("humidity"));
+                                        kafkaSendMessage.put("pressure", jsonObject.get("pressure"));
                                         String utcTimeStr = jsonObject.getString("utcTime");
                                         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss");
                                         LocalDateTime localDateTime = LocalDateTime.parse(utcTimeStr, formatter);
@@ -209,18 +215,18 @@ public class MqttConfig {
                                         log.info("气体类型{}", pointType);
                                         if ("氨气".equals(pointType)) { //可燃 同时有毒
                                             String NH3 = jsonObject.getJSONObject("airData").getString("NH3(ppm)");
-                                            log.info("气体{},内容{}", pointType, kafkaSendMessage);
+                                            //log.info("气体{},内容{}", pointType, kafkaSendMessage);
                                             kafkaSendMessage.put("monitorType", "4");
                                             kafkaSendMessage.put("realtimeValue", NH3);
                                             if (StringUtils.isBlank(NH3)){
                                                 continue;
                                             }
                                             try {
-                                                kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                                log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                                kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                                log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                                 kafkaSendMessage.put("monitorType", "3");
-                                                kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                                log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                                kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                                log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                             } catch (Exception e) {
                                                 log.error("发送消息失败,气体{},内容{}", pointType, kafkaSendMessage);
                                             }
@@ -233,8 +239,8 @@ public class MqttConfig {
                                             if (StringUtils.isBlank(CH4)){
                                                 continue;
                                             }
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                         }
 
                                         if ("硫化氢".equals(pointType)) { //可燃 同时有毒
@@ -244,11 +250,11 @@ public class MqttConfig {
                                             if (StringUtils.isBlank(H2S)){
                                                 continue;
                                             }
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                             kafkaSendMessage.put("monitorType", "3");
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                         }
 
                                         if ("一氧化碳".equals(pointType)) { //可燃 同时有毒
@@ -258,11 +264,11 @@ public class MqttConfig {
                                             if (StringUtils.isBlank(CO)){
                                                 continue;
                                             }
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                             kafkaSendMessage.put("monitorType", "3");
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                         }
 
                                         if ("二氧化氮".equals(pointType)) {
@@ -272,8 +278,8 @@ public class MqttConfig {
                                             if (StringUtils.isBlank(NO2)){
                                                 continue;
                                             }
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                         }
 
                                         if ("二氧化硫".equals(pointType)) {
@@ -284,8 +290,8 @@ public class MqttConfig {
                                                 continue;
                                             }
                                             // TODO: 2024/6/24 kafka  send
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                         }
 
                                         if ("氯化氢".equals(pointType)) {
@@ -295,8 +301,8 @@ public class MqttConfig {
                                             if (StringUtils.isBlank(HCL)){
                                                 continue;
                                             }
-                                            kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
-                                            log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
+                                            kafkaTemplate.send("wrjAll", kafkaSendMessage.toJSONString());
+                                            log.info("发送了 wrjAll 消息,气体{},内容{}", pointType, kafkaSendMessage);
                                         }
                                     }