Ver código fonte

fix 车辆定位kafka

liuyongxin 8 meses atrás
pai
commit
ee765dee5e

+ 21 - 0
src/main/java/com/lutao/carlocation/hand/KafkaConsumerRunner.java

@@ -0,0 +1,21 @@
+package com.lutao.carlocation.hand;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class KafkaConsumerRunner  implements CommandLineRunner {
+
+
+    @Autowired
+    private KafkaConsumerService consumerService;
+
+    @Override
+    public void run(String... args) throws Exception {
+        consumerService.startConsuming();
+        log.info("jinta 车辆定位服务 start consuming");
+    }
+}

+ 150 - 0
src/main/java/com/lutao/carlocation/hand/KafkaConsumerService.java

@@ -0,0 +1,150 @@
+package com.lutao.carlocation.hand;
+
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.lutao.carlocation.entity.CollectLog;
+import com.lutao.carlocation.kafka.RsaBase64Utils;
+import com.lutao.carlocation.service.CollectLogService;
+import com.lutao.carlocation.service.IsolatedVehicleLocationMonitorService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Properties;
+
+@Service
+@Slf4j
+public class KafkaConsumerService {
+
+    @Resource(name = "kafkaTemplate2")
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    @Resource
+    private IsolatedVehicleLocationMonitorService isolatedVehicleLocationMonitorService;
+
+    @Resource
+    private CollectLogService collectLogService;
+
+
+
+
+    public void startConsuming() {
+        // 初始化消费者属性
+        Properties props = new Properties();
+        // 设置服务器地址、组ID等...
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "datapoc1.66yunlian.cn:30886,datapoc2.66yunlian.cn:30886,datapoc3.66yunlian.cn:30886");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_push_out_jinta_group");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
+        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"customer67\" password=\"dscPEQAnyJa%^ZwK\";");
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList("topic_push_out_jinta"));
+        while (true) {
+            ConsumerRecords<String, String> records = consumer.poll(100);
+            for (ConsumerRecord<String, String> t : records) {
+                log.info(t.key() + "===" + t.value());
+                try {
+                    switch (t.key()){
+                        case "gps_common_vehicle_loc":
+                            //可视化常用车位置数据
+                            break;
+                        case "gps_vehicle_location":
+                            //车辆位置信息数据
+                            String decrypt = RsaBase64Utils.decrypt(t.value());
+                            if (JSONUtil.isJson(decrypt)) {
+                                JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
+                                log.info("received gps_vehicle_location message:{}", receiveMessage);
+                                JSONObject sendMessage = JSONUtil.createObj();
+                                sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
+                                sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
+                                sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
+                                Integer positionTime = (Integer) receiveMessage.get("positionTime");
+                                long time = positionTime * 1000L;
+                                // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
+                                Instant instant = Instant.ofEpochMilli(time);
+                                // 指定时区,例如使用UTC时区
+                                ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
+                                // 创建DateTimeFormatter对象,并指定输出的日期格式
+                                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+                                // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
+                                String formattedDate = zonedDateTime.format(formatter);
+                                sendMessage.putOnce("positioningTime", formattedDate);
+                                sendMessage.putOnce("address", "-");
+                                sendMessage.putOnce("speed", receiveMessage.get("vec"));
+                                sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
+                                sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
+                                sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
+                                sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
+                                sendMessage.putOnce("direction", receiveMessage.get("direction"));
+                                sendMessage.putOnce("accState", receiveMessage.get("accState"));
+                                sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
+                                sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
+                                sendMessage.putOnce("vec", receiveMessage.get("vec"));
+                                sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
+                                sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
+                                JSONArray jsonArray = JSONUtil.createArray();
+                                jsonArray.add(JSONUtil.parse(sendMessage));
+                                kafkaTemplate.send("vehicleLocation", jsonArray.toString());
+                                collectLogService.save(new CollectLog().setType("gps_vehicle_location").setContent(jsonArray.toString()));
+                                log.info("send gps_vehicle_location message:{}", jsonArray);
+                                //记录
+                                isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
+                            }
+                            break;
+                        case "park_filing_info":
+                            break;
+                        case "park_filing_queue_log":
+                            break;
+                        case "fence_log":
+                            break;
+                        case "alarm_log":
+                            //报警日志数据
+                            try {
+                                collectLogService.save(new CollectLog().setType("alarmLog").setContent(t.value()));
+                                String alarm = RsaBase64Utils.decrypt(t.value());
+                                collectLogService.save(new CollectLog().setType("alarmLog").setContent(alarm));
+                                if (JSONUtil.isJson(alarm)) {
+                                    JSONObject receiveMessage = JSONUtil.parseObj(alarm);
+                                    log.info("received alarm_log message:{}", receiveMessage);
+                                    kafkaTemplate.send("alarmLog", receiveMessage.toString());
+                                    log.info("send alarm_log message:{}", receiveMessage);
+                                    //collectLogService.save(new CollectLog().setType("alarmLog").setContent(receiveMessage.toString()));
+                                }
+                            } catch (Exception e) {
+                                log.error("解析异常车辆报警异常,exception:{}", e);
+                                collectLogService.save(new CollectLog().setType("error-车辆报警").setContent(t.value()));
+                            }
+                            break;
+                        case "park_view_count":
+                            //报警车辆数
+                            break;
+                        case "park_filing_queue_count":
+                            //报备队列统计数据
+                            break;
+                    }
+                }catch (Exception e){
+                    log.error("消费数据异常",e);
+                }
+            }
+        }
+    }
+}

+ 235 - 237
src/main/java/com/lutao/carlocation/kafka/ConsumerCarLoaction.java

@@ -1,244 +1,242 @@
-package com.lutao.carlocation.kafka;
-
-import cn.hutool.json.JSONArray;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
-import com.lutao.carlocation.entity.CollectLog;
-import com.lutao.carlocation.service.CollectLogService;
-import com.lutao.carlocation.service.IsolatedVehicleLocationMonitorService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.List;
-import java.util.Optional;
-
-@Component
-@Slf4j
-public class ConsumerCarLoaction {
-
-    @Resource(name = "kafkaTemplate2")
-    private KafkaTemplate<String, String> kafkaTemplate;
-
-    @Resource
-    private IsolatedVehicleLocationMonitorService isolatedVehicleLocationMonitorService;
-
-    @Resource
-    private CollectLogService collectLogService;
-
-
-    @KafkaListener(topics = "topic_push_out_jinta", groupId = "topic_push_out_jinta_group", containerFactory = "kafkaListenerContainerFactory")
-    public void listen(ConsumerRecord<String, String> t) {
-        Optional<Object> kafkaMassages = Optional.ofNullable(t);
-        try {
-            if (kafkaMassages.isPresent()) {
-//                consumerRecords
-//                        .forEach(
-//                                t -> {
-                                    log.info(t.key() + ":::::" + t.value());
-                                    switch (t.key()) {
-                                        case "gps_vehicle_location":
-                                            try {
-                                                String decrypt = RsaBase64Utils.decrypt(t.value());
-                                                if (JSONUtil.isJson(decrypt)) {
-                                                    JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
-                                                    log.info("received gps_vehicle_location message:{}", receiveMessage);
-                                                    JSONObject sendMessage = JSONUtil.createObj();
-                                                    sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
-                                                    sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
-                                                    sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
-                                                    Integer positionTime = (Integer) receiveMessage.get("positionTime");
-                                                    long time = positionTime * 1000L;
-                                                    // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
-                                                    Instant instant = Instant.ofEpochMilli(time);
-                                                    // 指定时区,例如使用UTC时区
-                                                    ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
-                                                    // 创建DateTimeFormatter对象,并指定输出的日期格式
-                                                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-                                                    // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
-                                                    String formattedDate = zonedDateTime.format(formatter);
-                                                    sendMessage.putOnce("positioningTime", formattedDate);
-                                                    sendMessage.putOnce("address", "-");
-                                                    sendMessage.putOnce("speed", receiveMessage.get("vec"));
-                                                    sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
-                                                    sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
-                                                    sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
-                                                    sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
-                                                    sendMessage.putOnce("direction", receiveMessage.get("direction"));
-                                                    sendMessage.putOnce("accState", receiveMessage.get("accState"));
-                                                    sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
-                                                    sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
-                                                    sendMessage.putOnce("vec", receiveMessage.get("vec"));
-                                                    sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
-                                                    sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
-                                                    JSONArray jsonArray = JSONUtil.createArray();
-                                                    jsonArray.add(JSONUtil.parse(sendMessage));
-                                                    kafkaTemplate.send("vehicleLocation", jsonArray.toString());
-                                                    collectLogService.save(new CollectLog().setType("gps_vehicle_location").setContent(jsonArray.toString()));
-                                                    log.info("send gps_vehicle_location message:{}", jsonArray);
-                                                    //记录
-                                                    isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
-                                                }
-                                            } catch (Exception e) {
-                                                log.error("解密异常车辆定位异常,exception:{}", e);
-                                                collectLogService.save(new CollectLog().setType("error-车辆定位").setContent(t.value()));
-                                            }
-                                            break;
-                                        case "alarm_log":
-                                            try {
-                                                String decrypt = RsaBase64Utils.decrypt(t.value());
-                                                if (JSONUtil.isJson(decrypt)) {
-                                                    JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
-                                                    log.info("received alarm_log message:{}", receiveMessage);
-                                                    kafkaTemplate.send("alarmLog", receiveMessage.toString());
-                                                    log.info("send alarm_log message:{}", receiveMessage);
-                                                    collectLogService.save(new CollectLog().setType("alarmLog").setContent(receiveMessage.toString()));
-                                                }
-                                            } catch (Exception e) {
-                                                log.error("解析异常车辆报警异常,exception:{}", e);
-                                                collectLogService.save(new CollectLog().setType("error-车辆报警").setContent(t.value()));
-                                            }
-                                    }
-                         //       });
-            }
-        } catch (Exception e) {
-            log.error("事件kafka消费者异常,exception:{}", e);
-        }
-    }
-
-
-//    @KafkaListener(topics = "topic_push_out_jinta", groupId = "topic_push_out_jinta_group", containerFactory = "kafkaListenerContainerFactory")
-//    public void listen(String message) {
+//package com.lutao.carlocation.kafka;
+//
+//import cn.hutool.json.JSONArray;
+//import cn.hutool.json.JSONObject;
+//import cn.hutool.json.JSONUtil;
+//import com.lutao.carlocation.entity.CollectLog;
+//import com.lutao.carlocation.service.CollectLogService;
+//import com.lutao.carlocation.service.IsolatedVehicleLocationMonitorService;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.kafka.clients.consumer.ConsumerRecord;
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.kafka.core.KafkaTemplate;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.Resource;
+//import java.time.Instant;
+//import java.time.ZoneId;
+//import java.time.ZonedDateTime;
+//import java.time.format.DateTimeFormatter;
+//import java.util.List;
+//import java.util.Optional;
+//
+//@Component
+//@Slf4j
+//public class ConsumerCarLoaction {
+//
+//    @Resource(name = "kafkaTemplate2")
+//    private KafkaTemplate<String, String> kafkaTemplate;
+//
+//    @Resource
+//    private IsolatedVehicleLocationMonitorService isolatedVehicleLocationMonitorService;
+//
+//    @Resource
+//    private CollectLogService collectLogService;
+//
+//
+//    //@KafkaListener(topics = "topic_push_out_jinta", groupId = "topic_push_out_jinta_group", containerFactory = "kafkaListenerContainerFactory")
+//    public void listen(ConsumerRecord<String, String> t) {
+//        Optional<Object> kafkaMassages = Optional.ofNullable(t);
 //        try {
-//            String decrypt = RsaBase64Utils.decrypt(message);
-//
-//            if (JSONUtil.isJson(decrypt)){
-//                log.info("received message:{}", decrypt);
-//                CollectLog carLocation = new CollectLog().setType("carLocation").setContent(decrypt);
-//                collectLogService.save(carLocation);
-//                JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
-//
-//                if (receiveMessage.containsKey("alarmType")){
-//                    //车辆报警
-//                    kafkaTemplate.send("alarmLog", receiveMessage.toString());
-//                    CollectLog alarm = new CollectLog().setType("alarm").setContent(decrypt);
-//                    collectLogService.save(alarm);
-//                    log.info("send-message-alarmLog:{}", receiveMessage);
-//                }else {
-//                    /**
-//                     * {
-//                     *     "accState": 1, ACC状态 0:关 1:开
-//                     *     "alititude": 1183, 海拔高度
-//                     *     "direction": 307, 方向
-//                     *     "directionDesc": "", 车辆方向文字描述
-//                     *     "latitude": "40.31111", 维度
-//                     *     "longitude": "99.087273", 经度
-//                     *     "mileage": 297681, 行驶里程
-//                     *     "onlineStatus": "", 车辆在线状态 0:离线,1:在线
-//                     *     "plateColor": 2, 车牌颜色
-//                     *     "positionTime": 1720585609, 产生位置时间
-//                     *     "transportStatus": "", 运输状态 0:空闲;1:在运输
-//                     *     "vec": 0, 速度
-//                     *     "vehicleNO": "甘F36477"
-//                     * }
-//                     */
-//
-//                    JSONObject sendMessage = new JSONObject();
-//
-//
-//                    /**
-//                     * [
-//                     * {
-//                     *  "locaterCode": "1",
-//                     * "longitude": "87.520656",
-//                     * "latitude": "44.274526 ",
-//                     *  "positioningTime": "2024-06-26 15:01:18",
-//                     *  "address": "车辆门口",
-//                     *  "speed": "10",
-//                     *  "vehicleNO": "豫BDH225",
-//                     * "plateColor": "2",
-//                     * "altitude": "397.0",
-//                     * "mileage": "374631.0"
-//                     * "direction": "108"
-//                     * "accState": "1"
-//                     * "positionTime": "1719385278"
-//                     * "directionDesc": "偏东"
-//                     * "vec": "10.0"
-//                     * "online_status": "1"
-//                     * "transport_status": "1"
-//                     * },
-//                     * ]
-//                     */
-//                    sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
-//                    sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
-//                    sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
-//                    Integer positionTime = (Integer) receiveMessage.get("positionTime");
-//
-//                    long time = positionTime*1000L;
-//
-//                    // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
-//                    Instant instant = Instant.ofEpochMilli(time);
-//
-//                    // 指定时区,例如使用UTC时区
-//                    ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
-//
-//                    // 创建DateTimeFormatter对象,并指定输出的日期格式
-//                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-//
-//                    // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
-//                    String formattedDate = zonedDateTime.format(formatter);
-//
-//                    sendMessage.putOnce("positioningTime", formattedDate);
-//                    sendMessage.putOnce("address", "-");
-//                    sendMessage.putOnce("speed", receiveMessage.get("vec"));
-//                    sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
-//                    sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
-//                    sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
-//                    sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
-//                    sendMessage.putOnce("direction", receiveMessage.get("direction"));
-//                    sendMessage.putOnce("accState", receiveMessage.get("accState"));
-//                    sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
-//                    sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
-//                    sendMessage.putOnce("vec", receiveMessage.get("vec"));
-//                    sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
-//                    sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
-//
-//                    JSONArray jsonArray = JSONUtil.createArray();
-//
-//                    jsonArray.add(JSONUtil.parse(sendMessage));
-//
-//                    kafkaTemplate.send("vehicleLocation", jsonArray.toString());
-//                    log.info("send message:{}", jsonArray);
-//
-//                    //记录
-//                    isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
+//            if (kafkaMassages.isPresent()) {
+//                log.info(t.key() + ":::::" + t.value());
+//                switch (t.key()) {
+//                    case "gps_vehicle_location":
+//                        try {
+//                            String decrypt = RsaBase64Utils.decrypt(t.value());
+//                            if (JSONUtil.isJson(decrypt)) {
+//                                JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
+//                                log.info("received gps_vehicle_location message:{}", receiveMessage);
+//                                JSONObject sendMessage = JSONUtil.createObj();
+//                                sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
+//                                sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
+//                                sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
+//                                Integer positionTime = (Integer) receiveMessage.get("positionTime");
+//                                long time = positionTime * 1000L;
+//                                // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
+//                                Instant instant = Instant.ofEpochMilli(time);
+//                                // 指定时区,例如使用UTC时区
+//                                ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
+//                                // 创建DateTimeFormatter对象,并指定输出的日期格式
+//                                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+//                                // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
+//                                String formattedDate = zonedDateTime.format(formatter);
+//                                sendMessage.putOnce("positioningTime", formattedDate);
+//                                sendMessage.putOnce("address", "-");
+//                                sendMessage.putOnce("speed", receiveMessage.get("vec"));
+//                                sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
+//                                sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
+//                                sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
+//                                sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
+//                                sendMessage.putOnce("direction", receiveMessage.get("direction"));
+//                                sendMessage.putOnce("accState", receiveMessage.get("accState"));
+//                                sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
+//                                sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
+//                                sendMessage.putOnce("vec", receiveMessage.get("vec"));
+//                                sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
+//                                sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
+//                                JSONArray jsonArray = JSONUtil.createArray();
+//                                jsonArray.add(JSONUtil.parse(sendMessage));
+//                                kafkaTemplate.send("vehicleLocation", jsonArray.toString());
+//                                collectLogService.save(new CollectLog().setType("gps_vehicle_location").setContent(jsonArray.toString()));
+//                                log.info("send gps_vehicle_location message:{}", jsonArray);
+//                                //记录
+//                                isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
+//                            }
+//                        } catch (Exception e) {
+//                            log.error("解密异常车辆定位异常,exception:{}", e);
+//                            collectLogService.save(new CollectLog().setType("error-车辆定位").setContent(t.value()));
+//                        }
+//                        break;
+//                    case "alarm_log":
+//                        try {
+//                            String decrypt = RsaBase64Utils.decrypt(t.value());
+//                            if (JSONUtil.isJson(decrypt)) {
+//                                JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
+//                                log.info("received alarm_log message:{}", receiveMessage);
+//                                kafkaTemplate.send("alarmLog", receiveMessage.toString());
+//                                log.info("send alarm_log message:{}", receiveMessage);
+//                                collectLogService.save(new CollectLog().setType("alarmLog").setContent(receiveMessage.toString()));
+//                            }
+//                        } catch (Exception e) {
+//                            log.error("解析异常车辆报警异常,exception:{}", e);
+//                            collectLogService.save(new CollectLog().setType("error-车辆报警").setContent(t.value()));
+//                        }
+//                        break;
 //                }
 //
-//
 //            }
-//
-//
 //        } catch (Exception e) {
-//            try {
-//                String decrypt = RsaBase64Utils.decrypt(message);
-//                log.error("error:{}", decrypt);
-//            } catch (Exception ex) {
-//                log.info("error:{}", e.getMessage());
-//                e.printStackTrace();
-//            }
-//            log.info("error:{}", e.getMessage());
-//            CollectLog error = new CollectLog().setType("error").setContent(e.getMessage());
-//            collectLogService.save(error);
-//            e.printStackTrace();
+//            log.error("事件kafka消费者异常,exception:{}", e);
 //        }
 //    }
-
-
-}
+//
+//
+////    @KafkaListener(topics = "topic_push_out_jinta", groupId = "topic_push_out_jinta_group", containerFactory = "kafkaListenerContainerFactory")
+////    public void listen(String message) {
+////        try {
+////            String decrypt = RsaBase64Utils.decrypt(message);
+////
+////            if (JSONUtil.isJson(decrypt)){
+////                log.info("received message:{}", decrypt);
+////                CollectLog carLocation = new CollectLog().setType("carLocation").setContent(decrypt);
+////                collectLogService.save(carLocation);
+////                JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
+////
+////                if (receiveMessage.containsKey("alarmType")){
+////                    //车辆报警
+////                    kafkaTemplate.send("alarmLog", receiveMessage.toString());
+////                    CollectLog alarm = new CollectLog().setType("alarm").setContent(decrypt);
+////                    collectLogService.save(alarm);
+////                    log.info("send-message-alarmLog:{}", receiveMessage);
+////                }else {
+////                    /**
+////                     * {
+////                     *     "accState": 1, ACC状态 0:关 1:开
+////                     *     "alititude": 1183, 海拔高度
+////                     *     "direction": 307, 方向
+////                     *     "directionDesc": "", 车辆方向文字描述
+////                     *     "latitude": "40.31111", 维度
+////                     *     "longitude": "99.087273", 经度
+////                     *     "mileage": 297681, 行驶里程
+////                     *     "onlineStatus": "", 车辆在线状态 0:离线,1:在线
+////                     *     "plateColor": 2, 车牌颜色
+////                     *     "positionTime": 1720585609, 产生位置时间
+////                     *     "transportStatus": "", 运输状态 0:空闲;1:在运输
+////                     *     "vec": 0, 速度
+////                     *     "vehicleNO": "甘F36477"
+////                     * }
+////                     */
+////
+////                    JSONObject sendMessage = new JSONObject();
+////
+////
+////                    /**
+////                     * [
+////                     * {
+////                     *  "locaterCode": "1",
+////                     * "longitude": "87.520656",
+////                     * "latitude": "44.274526 ",
+////                     *  "positioningTime": "2024-06-26 15:01:18",
+////                     *  "address": "车辆门口",
+////                     *  "speed": "10",
+////                     *  "vehicleNO": "豫BDH225",
+////                     * "plateColor": "2",
+////                     * "altitude": "397.0",
+////                     * "mileage": "374631.0"
+////                     * "direction": "108"
+////                     * "accState": "1"
+////                     * "positionTime": "1719385278"
+////                     * "directionDesc": "偏东"
+////                     * "vec": "10.0"
+////                     * "online_status": "1"
+////                     * "transport_status": "1"
+////                     * },
+////                     * ]
+////                     */
+////                    sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
+////                    sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
+////                    sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
+////                    Integer positionTime = (Integer) receiveMessage.get("positionTime");
+////
+////                    long time = positionTime*1000L;
+////
+////                    // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
+////                    Instant instant = Instant.ofEpochMilli(time);
+////
+////                    // 指定时区,例如使用UTC时区
+////                    ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
+////
+////                    // 创建DateTimeFormatter对象,并指定输出的日期格式
+////                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+////
+////                    // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
+////                    String formattedDate = zonedDateTime.format(formatter);
+////
+////                    sendMessage.putOnce("positioningTime", formattedDate);
+////                    sendMessage.putOnce("address", "-");
+////                    sendMessage.putOnce("speed", receiveMessage.get("vec"));
+////                    sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
+////                    sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
+////                    sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
+////                    sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
+////                    sendMessage.putOnce("direction", receiveMessage.get("direction"));
+////                    sendMessage.putOnce("accState", receiveMessage.get("accState"));
+////                    sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
+////                    sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
+////                    sendMessage.putOnce("vec", receiveMessage.get("vec"));
+////                    sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
+////                    sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
+////
+////                    JSONArray jsonArray = JSONUtil.createArray();
+////
+////                    jsonArray.add(JSONUtil.parse(sendMessage));
+////
+////                    kafkaTemplate.send("vehicleLocation", jsonArray.toString());
+////                    log.info("send message:{}", jsonArray);
+////
+////                    //记录
+////                    isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
+////                }
+////
+////
+////            }
+////
+////
+////        } catch (Exception e) {
+////            try {
+////                String decrypt = RsaBase64Utils.decrypt(message);
+////                log.error("error:{}", decrypt);
+////            } catch (Exception ex) {
+////                log.info("error:{}", e.getMessage());
+////                e.printStackTrace();
+////            }
+////            log.info("error:{}", e.getMessage());
+////            CollectLog error = new CollectLog().setType("error").setContent(e.getMessage());
+////            collectLogService.save(error);
+////            e.printStackTrace();
+////        }
+////    }
+//
+//
+//}

+ 36 - 38
src/main/java/com/lutao/carlocation/kafka/KafkaConsumerConfig.java

@@ -1,15 +1,13 @@
 package com.lutao.carlocation.kafka;
 
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.*;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -18,41 +16,41 @@ import java.util.Map;
 @Configuration
 public class KafkaConsumerConfig {
 
-    @Bean
-    public Map<String, Object> consumerConfigs() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "datapoc1.66yunlian.cn:30886,datapoc2.66yunlian.cn:30886,datapoc3.66yunlian.cn:30886");
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_push_out_jinta_group");
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
-        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
-        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"customer67\" password=\"dscPEQAnyJa%^ZwK\";");
+//    @Bean
+//    public Map<String, Object> consumerConfigs() {
+//        Map<String, Object> props = new HashMap<>();
+//        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "datapoc1.66yunlian.cn:30886,datapoc2.66yunlian.cn:30886,datapoc3.66yunlian.cn:30886");
+//        props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_push_out_jinta_group");
+//        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+//        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+//        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+//        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+//        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+//        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
+//        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"customer67\" password=\"dscPEQAnyJa%^ZwK\";");
+//
+//        return props;
+//    }
+//
+//    @Bean
+//    public ConsumerFactory<String, String> consumerFactory() {
+//        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+//    }
+//
+//    @Bean
+//    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
+//        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+//        factory.setConsumerFactory(consumerFactory());
+//        return factory;
+//    }
 
-        return props;
-    }
-
-    @Bean
-    public ConsumerFactory<String, String> consumerFactory() {
-        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
-    }
 
-    @Bean
-    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
-        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
-        factory.setConsumerFactory(consumerFactory());
-        return factory;
-    }
-
-
-    @Bean
-    public KafkaTemplate<String, String> kafkaTemplate1(ProducerFactory<String, String> consumerFactory) {
-        return new KafkaTemplate<>(consumerFactory);
-    }
+//    @Bean
+//    public KafkaTemplate<String, String> kafkaTemplate1(ProducerFactory<String, String> consumerFactory) {
+//        return new KafkaTemplate<>(consumerFactory);
+//    }
 
     @Bean
     public Map<String, Object> producerConfigs2() {