|
@@ -2,9 +2,10 @@ package com.lutao.uav.mqtt;
|
|
|
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
-
|
|
|
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
|
|
|
+import com.lutao.uav.domain.entity.GasMessage;
|
|
|
import com.lutao.uav.domain.entity.NorthSeaGasPoint;
|
|
|
+import com.lutao.uav.service.GasMessageService;
|
|
|
import com.lutao.uav.service.NorthSeaGasPointService;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.eclipse.paho.client.mqttv3.*;
|
|
@@ -14,7 +15,6 @@ import org.springframework.data.redis.connection.RedisGeoCommands;
|
|
|
import org.springframework.data.redis.core.BoundGeoOperations;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
-import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
@@ -55,15 +55,18 @@ public class MqttConfig {
|
|
|
@Resource
|
|
|
private KafkaTemplate kafkaTemplate;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private GasMessageService gasMessageService;
|
|
|
+
|
|
|
private MqttClient client;
|
|
|
private ExecutorService executorService;
|
|
|
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
- initPointData();
|
|
|
+ //initPointData();
|
|
|
executorService = Executors.newSingleThreadExecutor();
|
|
|
connectWithReconnect();
|
|
|
- test();
|
|
|
+ //test();
|
|
|
}
|
|
|
|
|
|
//初始化点位数据
|
|
@@ -111,157 +114,144 @@ public class MqttConfig {
|
|
|
double longitude = jsonObject.getDoubleValue("longitude");
|
|
|
double latitude = jsonObject.getDoubleValue("latitude");
|
|
|
try {
|
|
|
- Point point = new Point(longitude, latitude);
|
|
|
- Distance distance = new Distance(1000000, RedisGeoCommands.DistanceUnit.KILOMETERS);
|
|
|
- Circle circle = new Circle(point, distance);
|
|
|
-
|
|
|
- RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs()
|
|
|
- .sortAscending()
|
|
|
- .includeCoordinates()
|
|
|
- .includeDistance()
|
|
|
- .limit(1);
|
|
|
-
|
|
|
- GeoResults<RedisGeoCommands.GeoLocation<String>> outlets = redisTemplate.boundGeoOps("north_sea_point").radius(circle, args);
|
|
|
- if (outlets != null) {
|
|
|
- for (GeoResult<RedisGeoCommands.GeoLocation<String>> outlet : outlets) {
|
|
|
- log.info("需要传送的消息: {}", outlet.getContent().getName());
|
|
|
- // TODO: 2024/6/21 send to kafka
|
|
|
-
|
|
|
- JSONObject sendMessage = JSONObject.parseObject(outlet.getContent().getName());
|
|
|
-
|
|
|
- /**
|
|
|
- * {
|
|
|
- * "enterpriseId":"140170010",
|
|
|
- * "equipCode":"141110055001G0001",
|
|
|
- * "surveyInstrumentCode":"141110055001G0001YL001",
|
|
|
- * "monitorType":"6",
|
|
|
- * "monitorUnit":"mm",
|
|
|
- * "realtimeValue":"1.57",
|
|
|
- * "collectTime":"2024-04-12 18:21:11"
|
|
|
- * }
|
|
|
- */
|
|
|
-
|
|
|
- JSONObject kafkaSendMessage = new JSONObject();
|
|
|
- kafkaSendMessage.put("enterpriseId", sendMessage.get("companyName"));
|
|
|
- kafkaSendMessage.put("equipCode", sendMessage.get("pointCode"));
|
|
|
- kafkaSendMessage.put("surveyInstrumentCode", sendMessage.get("pointCode2"));
|
|
|
- kafkaSendMessage.put("monitorUnit", sendMessage.get("unit"));
|
|
|
- String utcTimeStr = "2024-07-10-07-58-08";
|
|
|
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss");
|
|
|
- LocalDateTime localDateTime = LocalDateTime.parse(utcTimeStr, formatter);
|
|
|
-
|
|
|
- // 将LocalDateTime转换为Instant
|
|
|
- Instant instant = localDateTime.atZone(ZoneId.of("UTC")).toInstant();
|
|
|
-
|
|
|
- // 转换到东八区
|
|
|
- ZonedDateTime shanghaiTime = instant.atZone(ZoneId.of("Asia/Shanghai"));
|
|
|
-
|
|
|
- // 格式化为字符串
|
|
|
- String formattedTime = shanghaiTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
|
|
-
|
|
|
- // 定义输入和输出的日期时间格式
|
|
|
- DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
|
|
|
- DateTimeFormatter outputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
-
|
|
|
- // 解析原始字符串到LocalDateTime对象
|
|
|
- LocalDateTime dateTime = LocalDateTime.parse(formattedTime, inputFormat);
|
|
|
-
|
|
|
- // 将LocalDateTime对象格式化为新的字符串格式
|
|
|
- String formattedDateTime = dateTime.format(outputFormat);
|
|
|
-
|
|
|
- kafkaSendMessage.put("collectTime", formattedDateTime);
|
|
|
-
|
|
|
- // kafkaSendMessage.put("realtimeValue", sendMessage.get("unit"));
|
|
|
-
|
|
|
- //NH3/NO2/CH4/H2S/SO2/HCL/CO
|
|
|
- //3 有毒气体 4 可燃气体
|
|
|
- String pointType = sendMessage.getString("pointType");
|
|
|
- if ("氨气".equals(pointType)){ //可燃 同时有毒
|
|
|
- String NH3 = jsonObject.getJSONObject("airData").getString("NH3(ppm)");
|
|
|
- kafkaSendMessage.put("monitorType", "4");
|
|
|
- kafkaSendMessage.put("realtimeValue", NH3);
|
|
|
-
|
|
|
- // TODO: 2024/6/24 kafka send 两次
|
|
|
- try {
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
- kafkaSendMessage.put("monitorType", "3");
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
- }catch (Exception e){
|
|
|
- log.error("发送消息失败,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if ("甲烷".equals(pointType)){
|
|
|
- String CH4 = jsonObject.getJSONObject("airData").getString("CxHy(%)");
|
|
|
- kafkaSendMessage.put("monitorType", "4");
|
|
|
- kafkaSendMessage.put("realtimeValue", CH4);
|
|
|
-
|
|
|
- // TODO: 2024/6/24 kafka send
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
-
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ /**
|
|
|
+ * {
|
|
|
+ * "enterpriseId":"140170010",
|
|
|
+ * "equipCode":"141110055001G0001",
|
|
|
+ * "surveyInstrumentCode":"141110055001G0001YL001",
|
|
|
+ * "monitorType":"6",
|
|
|
+ * "monitorUnit":"mm",
|
|
|
+ * "realtimeValue":"1.57",
|
|
|
+ * "collectTime":"2024-04-12 18:21:11"
|
|
|
+ * }
|
|
|
+ */
|
|
|
+ List<GasMessage> gasMessages = gasMessageService.list();
|
|
|
+ for (GasMessage gasMessage : gasMessages) {
|
|
|
+ JSONObject kafkaSendMessage = new JSONObject();
|
|
|
+ kafkaSendMessage.put("enterpriseId", gasMessage.getCompanyName());
|
|
|
+ kafkaSendMessage.put("equipCode", gasMessage.getPointCode());
|
|
|
+ kafkaSendMessage.put("surveyInstrumentCode", gasMessage.getPointCode2());
|
|
|
+ kafkaSendMessage.put("monitorUnit", gasMessage.getUnit());
|
|
|
+ String utcTimeStr = jsonObject.getString("utcTime");
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss");
|
|
|
+ LocalDateTime localDateTime = LocalDateTime.parse(utcTimeStr, formatter);
|
|
|
+
|
|
|
+ // 将LocalDateTime转换为Instant
|
|
|
+ Instant instant = localDateTime.atZone(ZoneId.of("UTC")).toInstant();
|
|
|
+
|
|
|
+ // 转换到东八区
|
|
|
+ ZonedDateTime shanghaiTime = instant.atZone(ZoneId.of("Asia/Shanghai"));
|
|
|
+
|
|
|
+ // 格式化为字符串
|
|
|
+ String formattedTime = shanghaiTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
|
|
+
|
|
|
+ // 定义输入和输出的日期时间格式
|
|
|
+ DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
|
|
|
+ DateTimeFormatter outputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ // 解析原始字符串到LocalDateTime对象
|
|
|
+ LocalDateTime dateTime = LocalDateTime.parse(formattedTime, inputFormat);
|
|
|
+
|
|
|
+ // 将LocalDateTime对象格式化为新的字符串格式
|
|
|
+ String formattedDateTime = dateTime.format(outputFormat);
|
|
|
+
|
|
|
+ // 输出转换后的字符串
|
|
|
+ kafkaSendMessage.put("collectTime", formattedDateTime);
|
|
|
+
|
|
|
+ // kafkaSendMessage.put("realtimeValue", sendMessage.get("unit"));
|
|
|
+
|
|
|
+ //NH3/NO2/CH4/H2S/SO2/HCL/CO
|
|
|
+ //3 有毒气体 4 可燃气体
|
|
|
+ String pointType = gasMessage.getPointType();
|
|
|
+ if ("氨气".equals(pointType)) { //可燃 同时有毒
|
|
|
+ String NH3 = jsonObject.getJSONObject("airData").getString("NH3(ppm)");
|
|
|
+ kafkaSendMessage.put("monitorType", "4");
|
|
|
+ kafkaSendMessage.put("realtimeValue", NH3);
|
|
|
+ if (StringUtils.isBlank(NH3)){
|
|
|
+ continue;
|
|
|
}
|
|
|
-
|
|
|
- if ("硫化氢".equals(pointType)){ //可燃 同时有毒
|
|
|
- String H2S = jsonObject.getJSONObject("airData").getString("H2S(ppm)");
|
|
|
- kafkaSendMessage.put("monitorType", "4");
|
|
|
- kafkaSendMessage.put("realtimeValue", H2S);
|
|
|
-
|
|
|
- // TODO: 2024/6/24 kafka send 两次
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ try {
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
kafkaSendMessage.put("monitorType", "3");
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("发送消息失败,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if ("一氧化碳".equals(pointType)){ //可燃 同时有毒
|
|
|
- String CO = jsonObject.getJSONObject("airData").getString("CO(ppm)");
|
|
|
- kafkaSendMessage.put("monitorType", "4");
|
|
|
- kafkaSendMessage.put("realtimeValue", CO);
|
|
|
-
|
|
|
- // TODO: 2024/6/24 kafka send 两次
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
- kafkaSendMessage.put("monitorType", "3");
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ if ("甲烷".equals(pointType)) {
|
|
|
+ String CH4 = jsonObject.getJSONObject("airData").getString("CxHy(%)");
|
|
|
+ kafkaSendMessage.put("monitorType", "4");
|
|
|
+ kafkaSendMessage.put("realtimeValue", CH4);
|
|
|
+ if (StringUtils.isBlank(CH4)){
|
|
|
+ continue;
|
|
|
}
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ }
|
|
|
|
|
|
- if ("二氧化氮".equals(pointType)){
|
|
|
- String NO2 = jsonObject.getJSONObject("airData").getString("NO2(ppm)");
|
|
|
- kafkaSendMessage.put("monitorType", "3");
|
|
|
- kafkaSendMessage.put("realtimeValue", NO2);
|
|
|
-
|
|
|
- // TODO: 2024/6/24 kafka send
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ if ("硫化氢".equals(pointType)) { //可燃 同时有毒
|
|
|
+ String H2S = jsonObject.getJSONObject("airData").getString("H2S(ppm)");
|
|
|
+ kafkaSendMessage.put("monitorType", "4");
|
|
|
+ kafkaSendMessage.put("realtimeValue", H2S);
|
|
|
+ if (StringUtils.isBlank(H2S)){
|
|
|
+ continue;
|
|
|
}
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ kafkaSendMessage.put("monitorType", "3");
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ }
|
|
|
|
|
|
- if ("二氧化硫".equals(pointType)){
|
|
|
- String SO2 = jsonObject.getJSONObject("airData").getString("SO2(ppm)");
|
|
|
- kafkaSendMessage.put("monitorType", "3");
|
|
|
- kafkaSendMessage.put("realtimeValue", SO2);
|
|
|
-
|
|
|
- // TODO: 2024/6/24 kafka send
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ if ("一氧化碳".equals(pointType)) { //可燃 同时有毒
|
|
|
+ String CO = jsonObject.getJSONObject("airData").getString("CO(ppm)");
|
|
|
+ kafkaSendMessage.put("monitorType", "4");
|
|
|
+ kafkaSendMessage.put("realtimeValue", CO);
|
|
|
+ if (StringUtils.isBlank(CO)){
|
|
|
+ continue;
|
|
|
}
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ kafkaSendMessage.put("monitorType", "3");
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ }
|
|
|
|
|
|
- if ("氯化氢".equals(pointType)){
|
|
|
- String HCL = jsonObject.getJSONObject("airData").getString("HCL(ppm)");
|
|
|
- kafkaSendMessage.put("monitorType", "3");
|
|
|
- kafkaSendMessage.put("realtimeValue", HCL);
|
|
|
-
|
|
|
- // TODO: 2024/6/24 kafka send
|
|
|
- kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ if ("二氧化氮".equals(pointType)) {
|
|
|
+ String NO2 = jsonObject.getJSONObject("airData").getString("NO2(ppm)");
|
|
|
+ kafkaSendMessage.put("monitorType", "3");
|
|
|
+ kafkaSendMessage.put("realtimeValue", NO2);
|
|
|
+ if (StringUtils.isBlank(NO2)){
|
|
|
+ continue;
|
|
|
}
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ }
|
|
|
|
|
|
+ if ("二氧化硫".equals(pointType)) {
|
|
|
+ String SO2 = jsonObject.getJSONObject("airData").getString("SO2(ppm)");
|
|
|
+ kafkaSendMessage.put("monitorType", "3");
|
|
|
+ kafkaSendMessage.put("realtimeValue", SO2);
|
|
|
+ if (StringUtils.isBlank(SO2)){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ // TODO: 2024/6/24 kafka send
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
+ }
|
|
|
|
|
|
-
|
|
|
+ if ("氯化氢".equals(pointType)) {
|
|
|
+ String HCL = jsonObject.getJSONObject("airData").getString("HCL(ppm)");
|
|
|
+ kafkaSendMessage.put("monitorType", "3");
|
|
|
+ kafkaSendMessage.put("realtimeValue", HCL);
|
|
|
+ if (StringUtils.isBlank(HCL)){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -297,7 +287,7 @@ public class MqttConfig {
|
|
|
}).start();
|
|
|
}
|
|
|
|
|
|
- public void test(){
|
|
|
+ public void test() {
|
|
|
Point point = new Point(0, 0);
|
|
|
Distance distance = new Distance(1000000, RedisGeoCommands.DistanceUnit.KILOMETERS);
|
|
|
Circle circle = new Circle(point, distance);
|