|
@@ -26,8 +26,10 @@ import java.time.LocalDateTime;
|
|
|
import java.time.ZoneId;
|
|
|
import java.time.ZonedDateTime;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.UUID;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
@@ -61,6 +63,8 @@ public class MqttConfig {
|
|
|
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
+ log.info("-------------------初始化MQTT配置-------------------");
|
|
|
+ initDatabase();
|
|
|
//initPointData();
|
|
|
executorService = Executors.newSingleThreadExecutor();
|
|
|
connectWithReconnect();
|
|
@@ -81,6 +85,47 @@ public class MqttConfig {
|
|
|
log.info("-------------------初始化成功-------------------");
|
|
|
}
|
|
|
|
|
|
+ //初始化数据库
|
|
|
+ public void initDatabase() {
|
|
|
+ try {
|
|
|
+ northSeaGasPointService.remove(null);
|
|
|
+ log.info("-------------------初始化北海点位数据库-------------------");
|
|
|
+
|
|
|
+ List<NorthSeaGasPoint> gasPoints = Arrays.asList(
|
|
|
+ createGasPoint("氨气", "wrj-aq", "氨气", "ppm", "26", "52"),
|
|
|
+ createGasPoint("甲烷", "wrj-jw", "甲烷", "%LEL", "25", "50"),
|
|
|
+ createGasPoint("硫化氢", "wrj-liuhq", "硫化氢", "PPM", "2.5", "5"),
|
|
|
+ createGasPoint("一氧化碳", "wrj-yyht", "一氧化碳", "PPM", null, null),
|
|
|
+ createGasPoint("二氧化氮", "wrj-eyhd", "二氧化氮", "PPM", "2.4", "4.8"),
|
|
|
+ createGasPoint("二氧化硫", "wrj-eyhl", "二氧化硫", "PPM", null, null),
|
|
|
+ createGasPoint("氯化氢", "wrj-lvhq", "氯化氢", "PPM", "2.5", "5")
|
|
|
+ );
|
|
|
+
|
|
|
+ boolean success = northSeaGasPointService.saveBatch(gasPoints);
|
|
|
+ if (success) {
|
|
|
+ log.info("-------------------初始化成功-------------------");
|
|
|
+ } else {
|
|
|
+ log.error("-------------------初始化失败-------------------");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("初始化过程中发生异常: ", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private NorthSeaGasPoint createGasPoint(String pointName, String pointCode, String pointType, String unit, String highReport, String highHighReport) {
|
|
|
+ NorthSeaGasPoint gasPoint = new NorthSeaGasPoint();
|
|
|
+ gasPoint.setId(UUID.randomUUID().toString());
|
|
|
+ gasPoint.setPointName(pointName);
|
|
|
+ gasPoint.setPointCode(pointCode);
|
|
|
+ gasPoint.setPointCode2(pointCode);
|
|
|
+ gasPoint.setPointType(pointType);
|
|
|
+ gasPoint.setUnit(unit);
|
|
|
+ if (highReport != null) gasPoint.setHighReport(highReport);
|
|
|
+ if (highHighReport != null) gasPoint.setHighHighReport(highHighReport);
|
|
|
+ return gasPoint;
|
|
|
+ }
|
|
|
+
|
|
|
private void connectWithReconnect() {
|
|
|
new Thread(() -> {
|
|
|
while (!Thread.currentThread().isInterrupted()) {
|
|
@@ -161,8 +206,10 @@ public class MqttConfig {
|
|
|
//NH3/NO2/CH4/H2S/SO2/HCL/CO
|
|
|
//3 有毒气体 4 可燃气体
|
|
|
String pointType = gasMessage.getPointType();
|
|
|
+ log.info("气体类型{}", pointType);
|
|
|
if ("氨气".equals(pointType)) { //可燃 同时有毒
|
|
|
String NH3 = jsonObject.getJSONObject("airData").getString("NH3(ppm)");
|
|
|
+ log.info("气体{},内容{}", pointType, kafkaSendMessage);
|
|
|
kafkaSendMessage.put("monitorType", "4");
|
|
|
kafkaSendMessage.put("realtimeValue", NH3);
|
|
|
if (StringUtils.isBlank(NH3)){
|