فهرست منبع

fix 车辆定位kafka

liuyongxin 9 ماه پیش
والد
کامیت
51c1271624
1فایلهای تغییر یافته به همراه202 افزوده شده و 119 حذف شده
  1. 202 119
      src/main/java/com/lutao/carlocation/kafka/ConsumerCarLoaction.java

+ 202 - 119
src/main/java/com/lutao/carlocation/kafka/ConsumerCarLoaction.java

@@ -7,6 +7,7 @@ import com.lutao.carlocation.entity.CollectLog;
 import com.lutao.carlocation.service.CollectLogService;
 import com.lutao.carlocation.service.IsolatedVehicleLocationMonitorService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
@@ -16,6 +17,8 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Optional;
 
 @Component
 @Slf4j
@@ -32,130 +35,210 @@ public class ConsumerCarLoaction {
 
 
     @KafkaListener(topics = "topic_push_out_jinta", groupId = "topic_push_out_jinta_group", containerFactory = "kafkaListenerContainerFactory")
-    public void listen(String message) {
+    public void listen(ConsumerRecord<String, String> t) {
+        Optional<Object> kafkaMassages = Optional.ofNullable(t);
         try {
-            String decrypt = RsaBase64Utils.decrypt(message);
-
-            if (JSONUtil.isJson(decrypt)){
-                log.info("received message:{}", decrypt);
-                CollectLog carLocation = new CollectLog().setType("carLocation").setContent(decrypt);
-                collectLogService.save(carLocation);
-                JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
-
-                if (receiveMessage.containsKey("alarmType")){
-                    //车辆报警
-                    kafkaTemplate.send("alarmLog", receiveMessage.toString());
-                    CollectLog alarm = new CollectLog().setType("alarm").setContent(decrypt);
-                    collectLogService.save(alarm);
-                    log.info("send-message-alarmLog:{}", receiveMessage);
-                }else {
-                    /**
-                     * {
-                     *     "accState": 1, ACC状态 0:关 1:开
-                     *     "alititude": 1183, 海拔高度
-                     *     "direction": 307, 方向
-                     *     "directionDesc": "", 车辆方向文字描述
-                     *     "latitude": "40.31111", 维度
-                     *     "longitude": "99.087273", 经度
-                     *     "mileage": 297681, 行驶里程
-                     *     "onlineStatus": "", 车辆在线状态 0:离线,1:在线
-                     *     "plateColor": 2, 车牌颜色
-                     *     "positionTime": 1720585609, 产生位置时间
-                     *     "transportStatus": "", 运输状态 0:空闲;1:在运输
-                     *     "vec": 0, 速度
-                     *     "vehicleNO": "甘F36477"
-                     * }
-                     */
-
-                    JSONObject sendMessage = new JSONObject();
-
-
-                    /**
-                     * [
-                     * {
-                     *  "locaterCode": "1",
-                     * "longitude": "87.520656",
-                     * "latitude": "44.274526 ",
-                     *  "positioningTime": "2024-06-26 15:01:18",
-                     *  "address": "车辆门口",
-                     *  "speed": "10",
-                     *  "vehicleNO": "豫BDH225",
-                     * "plateColor": "2",
-                     * "altitude": "397.0",
-                     * "mileage": "374631.0"
-                     * "direction": "108"
-                     * "accState": "1"
-                     * "positionTime": "1719385278"
-                     * "directionDesc": "偏东"
-                     * "vec": "10.0"
-                     * "online_status": "1"
-                     * "transport_status": "1"
-                     * },
-                     * ]
-                     */
-                    sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
-                    sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
-                    sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
-                    Integer positionTime = (Integer) receiveMessage.get("positionTime");
-
-                    long time = positionTime*1000L;
-
-                    // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
-                    Instant instant = Instant.ofEpochMilli(time);
-
-                    // 指定时区,例如使用UTC时区
-                    ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
-
-                    // 创建DateTimeFormatter对象,并指定输出的日期格式
-                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
-                    // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
-                    String formattedDate = zonedDateTime.format(formatter);
-
-                    sendMessage.putOnce("positioningTime", formattedDate);
-                    sendMessage.putOnce("address", "-");
-                    sendMessage.putOnce("speed", receiveMessage.get("vec"));
-                    sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
-                    sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
-                    sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
-                    sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
-                    sendMessage.putOnce("direction", receiveMessage.get("direction"));
-                    sendMessage.putOnce("accState", receiveMessage.get("accState"));
-                    sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
-                    sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
-                    sendMessage.putOnce("vec", receiveMessage.get("vec"));
-                    sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
-                    sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
-
-                    JSONArray jsonArray = JSONUtil.createArray();
-
-                    jsonArray.add(JSONUtil.parse(sendMessage));
-
-                    kafkaTemplate.send("vehicleLocation", jsonArray.toString());
-                    log.info("send message:{}", jsonArray);
-
-                    //记录
-                    isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
-                }
-
-
+            if (kafkaMassages.isPresent()) {
+//                consumerRecords
+//                        .forEach(
+//                                t -> {
+                                    log.info(t.key() + ":::::" + t.value());
+                                    switch (t.key()) {
+                                        case "gps_vehicle_location":
+                                            try {
+                                                String decrypt = RsaBase64Utils.decrypt(t.value());
+                                                if (JSONUtil.isJson(decrypt)) {
+                                                    JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
+                                                    log.info("received gps_vehicle_location message:{}", receiveMessage);
+                                                    JSONObject sendMessage = JSONUtil.createObj();
+                                                    sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
+                                                    sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
+                                                    sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
+                                                    Integer positionTime = (Integer) receiveMessage.get("positionTime");
+                                                    long time = positionTime * 1000L;
+                                                    // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
+                                                    Instant instant = Instant.ofEpochMilli(time);
+                                                    // 指定时区,例如使用UTC时区
+                                                    ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
+                                                    // 创建DateTimeFormatter对象,并指定输出的日期格式
+                                                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+                                                    // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
+                                                    String formattedDate = zonedDateTime.format(formatter);
+                                                    sendMessage.putOnce("positioningTime", formattedDate);
+                                                    sendMessage.putOnce("address", "-");
+                                                    sendMessage.putOnce("speed", receiveMessage.get("vec"));
+                                                    sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
+                                                    sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
+                                                    sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
+                                                    sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
+                                                    sendMessage.putOnce("direction", receiveMessage.get("direction"));
+                                                    sendMessage.putOnce("accState", receiveMessage.get("accState"));
+                                                    sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
+                                                    sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
+                                                    sendMessage.putOnce("vec", receiveMessage.get("vec"));
+                                                    sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
+                                                    sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
+                                                    JSONArray jsonArray = JSONUtil.createArray();
+                                                    jsonArray.add(JSONUtil.parse(sendMessage));
+                                                    kafkaTemplate.send("vehicleLocation", jsonArray.toString());
+                                                    collectLogService.save(new CollectLog().setType("gps_vehicle_location").setContent(jsonArray.toString()));
+                                                    log.info("send gps_vehicle_location message:{}", jsonArray);
+                                                    //记录
+                                                    isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
+                                                }
+                                            } catch (Exception e) {
+                                                log.error("解密异常车辆定位异常,exception:{}", e);
+                                                collectLogService.save(new CollectLog().setType("error-车辆定位").setContent(t.value()));
+                                            }
+                                            break;
+                                        case "alarm_log":
+                                            try {
+                                                String decrypt = RsaBase64Utils.decrypt(t.value());
+                                                if (JSONUtil.isJson(decrypt)) {
+                                                    JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
+                                                    log.info("received alarm_log message:{}", receiveMessage);
+                                                    kafkaTemplate.send("alarmLog", receiveMessage.toString());
+                                                    log.info("send alarm_log message:{}", receiveMessage);
+                                                    collectLogService.save(new CollectLog().setType("alarmLog").setContent(receiveMessage.toString()));
+                                                }
+                                            } catch (Exception e) {
+                                                log.error("解析异常车辆报警异常,exception:{}", e);
+                                                collectLogService.save(new CollectLog().setType("error-车辆报警").setContent(t.value()));
+                                            }
+                                    }
+                         //       });
             }
-
-
         } catch (Exception e) {
-            try {
-                String decrypt = RsaBase64Utils.decrypt(message);
-                log.error("error:{}", decrypt);
-            } catch (Exception ex) {
-                log.info("error:{}", e.getMessage());
-                e.printStackTrace();
-            }
-            log.info("error:{}", e.getMessage());
-            CollectLog error = new CollectLog().setType("error").setContent(e.getMessage());
-            collectLogService.save(error);
-            e.printStackTrace();
+            log.error("事件kafka消费者异常,exception:{}", e);
         }
     }
 
 
+//    @KafkaListener(topics = "topic_push_out_jinta", groupId = "topic_push_out_jinta_group", containerFactory = "kafkaListenerContainerFactory")
+//    public void listen(String message) {
+//        try {
+//            String decrypt = RsaBase64Utils.decrypt(message);
+//
+//            if (JSONUtil.isJson(decrypt)){
+//                log.info("received message:{}", decrypt);
+//                CollectLog carLocation = new CollectLog().setType("carLocation").setContent(decrypt);
+//                collectLogService.save(carLocation);
+//                JSONObject receiveMessage = JSONUtil.parseObj(decrypt);
+//
+//                if (receiveMessage.containsKey("alarmType")){
+//                    //车辆报警
+//                    kafkaTemplate.send("alarmLog", receiveMessage.toString());
+//                    CollectLog alarm = new CollectLog().setType("alarm").setContent(decrypt);
+//                    collectLogService.save(alarm);
+//                    log.info("send-message-alarmLog:{}", receiveMessage);
+//                }else {
+//                    /**
+//                     * {
+//                     *     "accState": 1, ACC状态 0:关 1:开
+//                     *     "alititude": 1183, 海拔高度
+//                     *     "direction": 307, 方向
+//                     *     "directionDesc": "", 车辆方向文字描述
+//                     *     "latitude": "40.31111", 维度
+//                     *     "longitude": "99.087273", 经度
+//                     *     "mileage": 297681, 行驶里程
+//                     *     "onlineStatus": "", 车辆在线状态 0:离线,1:在线
+//                     *     "plateColor": 2, 车牌颜色
+//                     *     "positionTime": 1720585609, 产生位置时间
+//                     *     "transportStatus": "", 运输状态 0:空闲;1:在运输
+//                     *     "vec": 0, 速度
+//                     *     "vehicleNO": "甘F36477"
+//                     * }
+//                     */
+//
+//                    JSONObject sendMessage = new JSONObject();
+//
+//
+//                    /**
+//                     * [
+//                     * {
+//                     *  "locaterCode": "1",
+//                     * "longitude": "87.520656",
+//                     * "latitude": "44.274526 ",
+//                     *  "positioningTime": "2024-06-26 15:01:18",
+//                     *  "address": "车辆门口",
+//                     *  "speed": "10",
+//                     *  "vehicleNO": "豫BDH225",
+//                     * "plateColor": "2",
+//                     * "altitude": "397.0",
+//                     * "mileage": "374631.0"
+//                     * "direction": "108"
+//                     * "accState": "1"
+//                     * "positionTime": "1719385278"
+//                     * "directionDesc": "偏东"
+//                     * "vec": "10.0"
+//                     * "online_status": "1"
+//                     * "transport_status": "1"
+//                     * },
+//                     * ]
+//                     */
+//                    sendMessage.putOnce("locaterCode", receiveMessage.get("vehicleNO"));
+//                    sendMessage.putOnce("longitude", receiveMessage.get("longitude"));
+//                    sendMessage.putOnce("latitude", receiveMessage.get("latitude"));
+//                    Integer positionTime = (Integer) receiveMessage.get("positionTime");
+//
+//                    long time = positionTime*1000L;
+//
+//                    // 使用Instant.ofEpochMilli()方法将时间戳转换为Instant对象
+//                    Instant instant = Instant.ofEpochMilli(time);
+//
+//                    // 指定时区,例如使用UTC时区
+//                    ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
+//
+//                    // 创建DateTimeFormatter对象,并指定输出的日期格式
+//                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+//
+//                    // 使用DateTimeFormatter对象将ZonedDateTime对象转换为字符串
+//                    String formattedDate = zonedDateTime.format(formatter);
+//
+//                    sendMessage.putOnce("positioningTime", formattedDate);
+//                    sendMessage.putOnce("address", "-");
+//                    sendMessage.putOnce("speed", receiveMessage.get("vec"));
+//                    sendMessage.putOnce("vehicleNO", receiveMessage.get("vehicleNO"));
+//                    sendMessage.putOnce("plateColor", receiveMessage.get("plateColor"));
+//                    sendMessage.putOnce("altitude", receiveMessage.get("alititude"));
+//                    sendMessage.putOnce("mileage", receiveMessage.get("mileage"));
+//                    sendMessage.putOnce("direction", receiveMessage.get("direction"));
+//                    sendMessage.putOnce("accState", receiveMessage.get("accState"));
+//                    sendMessage.putOnce("positionTime", receiveMessage.get("positionTime"));
+//                    sendMessage.putOnce("directionDesc", receiveMessage.get("directionDesc"));
+//                    sendMessage.putOnce("vec", receiveMessage.get("vec"));
+//                    sendMessage.putOnce("online_status", receiveMessage.get("onlineStatus"));
+//                    sendMessage.putOnce("transport_status", receiveMessage.get("transportStatus"));
+//
+//                    JSONArray jsonArray = JSONUtil.createArray();
+//
+//                    jsonArray.add(JSONUtil.parse(sendMessage));
+//
+//                    kafkaTemplate.send("vehicleLocation", jsonArray.toString());
+//                    log.info("send message:{}", jsonArray);
+//
+//                    //记录
+//                    isolatedVehicleLocationMonitorService.saveSendMessage(sendMessage);
+//                }
+//
+//
+//            }
+//
+//
+//        } catch (Exception e) {
+//            try {
+//                String decrypt = RsaBase64Utils.decrypt(message);
+//                log.error("error:{}", decrypt);
+//            } catch (Exception ex) {
+//                log.info("error:{}", e.getMessage());
+//                e.printStackTrace();
+//            }
+//            log.info("error:{}", e.getMessage());
+//            CollectLog error = new CollectLog().setType("error").setContent(e.getMessage());
+//            collectLogService.save(error);
+//            e.printStackTrace();
+//        }
+//    }
+
+
 }