|
@@ -14,11 +14,17 @@ import org.springframework.data.redis.connection.RedisGeoCommands;
|
|
|
import org.springframework.data.redis.core.BoundGeoOperations;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.PreDestroy;
|
|
|
import javax.annotation.Resource;
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.ZoneId;
|
|
|
+import java.time.ZonedDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -46,8 +52,8 @@ public class MqttConfig {
|
|
|
@Resource
|
|
|
private NorthSeaGasPointService northSeaGasPointService;
|
|
|
|
|
|
-// @Resource
|
|
|
-// private KafkaTemplate kafkaTemplate;
|
|
|
+ @Resource
|
|
|
+ private KafkaTemplate kafkaTemplate;
|
|
|
|
|
|
private MqttClient client;
|
|
|
private ExecutorService executorService;
|
|
@@ -140,7 +146,30 @@ public class MqttConfig {
|
|
|
kafkaSendMessage.put("equipCode", sendMessage.get("pointCode"));
|
|
|
kafkaSendMessage.put("surveyInstrumentCode", sendMessage.get("pointCode2"));
|
|
|
kafkaSendMessage.put("monitorUnit", sendMessage.get("unit"));
|
|
|
- kafkaSendMessage.put("collectTime", jsonObject.get("utcTime"));
|
|
|
+ String utcTimeStr = "2024-07-10-07-58-08";
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss");
|
|
|
+ LocalDateTime localDateTime = LocalDateTime.parse(utcTimeStr, formatter);
|
|
|
+
|
|
|
+ // 将LocalDateTime转换为Instant
|
|
|
+ Instant instant = localDateTime.atZone(ZoneId.of("UTC")).toInstant();
|
|
|
+
|
|
|
+ // 转换到东八区
|
|
|
+ ZonedDateTime shanghaiTime = instant.atZone(ZoneId.of("Asia/Shanghai"));
|
|
|
+
|
|
|
+ // 格式化为字符串
|
|
|
+ String formattedTime = shanghaiTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
|
|
+
|
|
|
+ // 定义输入和输出的日期时间格式
|
|
|
+ DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
|
|
|
+ DateTimeFormatter outputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ // 解析原始字符串到LocalDateTime对象
|
|
|
+ LocalDateTime dateTime = LocalDateTime.parse(formattedTime, inputFormat);
|
|
|
+
|
|
|
+ // 将LocalDateTime对象格式化为新的字符串格式
|
|
|
+ String formattedDateTime = dateTime.format(outputFormat);
|
|
|
+
|
|
|
+ kafkaSendMessage.put("collectTime", formattedDateTime);
|
|
|
|
|
|
// kafkaSendMessage.put("realtimeValue", sendMessage.get("unit"));
|
|
|
|
|
@@ -153,20 +182,25 @@ public class MqttConfig {
|
|
|
kafkaSendMessage.put("realtimeValue", NH3);
|
|
|
|
|
|
// TODO: 2024/6/24 kafka send 两次
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
- kafkaSendMessage.put("monitorType", "3");
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
- log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ try {
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ kafkaSendMessage.put("monitorType", "3");
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("发送消息失败,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if ("甲烷".equals(pointType)){
|
|
|
- String CH4 = jsonObject.getJSONObject("airData").getString("CH4(%)");
|
|
|
+ String CH4 = jsonObject.getJSONObject("airData").getString("CxHy(%)");
|
|
|
kafkaSendMessage.put("monitorType", "4");
|
|
|
kafkaSendMessage.put("realtimeValue", CH4);
|
|
|
|
|
|
// TODO: 2024/6/24 kafka send
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
}
|
|
|
|
|
@@ -176,10 +210,10 @@ public class MqttConfig {
|
|
|
kafkaSendMessage.put("realtimeValue", H2S);
|
|
|
|
|
|
// TODO: 2024/6/24 kafka send 两次
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
kafkaSendMessage.put("monitorType", "3");
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
}
|
|
|
|
|
@@ -189,10 +223,10 @@ public class MqttConfig {
|
|
|
kafkaSendMessage.put("realtimeValue", CO);
|
|
|
|
|
|
// TODO: 2024/6/24 kafka send 两次
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
kafkaSendMessage.put("monitorType", "3");
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
}
|
|
|
|
|
@@ -202,7 +236,7 @@ public class MqttConfig {
|
|
|
kafkaSendMessage.put("realtimeValue", NO2);
|
|
|
|
|
|
// TODO: 2024/6/24 kafka send
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
}
|
|
|
|
|
@@ -212,7 +246,7 @@ public class MqttConfig {
|
|
|
kafkaSendMessage.put("realtimeValue", SO2);
|
|
|
|
|
|
// TODO: 2024/6/24 kafka send
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
}
|
|
|
|
|
@@ -222,7 +256,7 @@ public class MqttConfig {
|
|
|
kafkaSendMessage.put("realtimeValue", HCL);
|
|
|
|
|
|
// TODO: 2024/6/24 kafka send
|
|
|
- //kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
+ kafkaTemplate.send("majorHazard", kafkaSendMessage.toJSONString());
|
|
|
log.info("发送了 majorHazard 消息,气体{},内容{}",pointType,kafkaSendMessage);
|
|
|
}
|
|
|
|
|
@@ -280,6 +314,12 @@ public class MqttConfig {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ //@Scheduled(fixedRate = 2000)
|
|
|
+ public void testsend() {
|
|
|
+ kafkaTemplate.send("majorHazard", "测试消息");
|
|
|
+ log.info("发送了 majorHazard 消息");
|
|
|
+ }
|
|
|
+
|
|
|
@PreDestroy
|
|
|
public void destroy() {
|
|
|
if (client != null && client.isConnected()) {
|