|
@@ -7,23 +7,20 @@ import com.lutao.cloud.collect.vo.DataAddVO;
|
|
|
import com.lutao.cloud.common.core.kafka.MessageCommandEnum;
|
|
|
import com.lutao.cloud.common.core.util.CollectionUtil;
|
|
|
import com.lutao.cloud.common.core.util.MessageInfo;
|
|
|
-import com.lutao.cloud.common.core.util.ObjectUtil;
|
|
|
-import com.lutao.cloud.common.core.util.StringUtil;
|
|
|
import com.lutao.cloud.consume.service.FireParametersTagService;
|
|
|
import com.lutao.cloud.consume.util.TimeUtil;
|
|
|
import com.lutao.entity.FireParametersTag;
|
|
|
import com.lutao.entity.PerceptionMonitorFireInfo;
|
|
|
+import com.lutao.entity.PerceptionMonitorGasInfo;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.data.redis.core.BoundHashOperations;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.Resource;
|
|
|
import java.time.LocalDate;
|
|
|
import java.time.LocalDateTime;
|
|
|
-import java.time.LocalTime;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
@@ -152,41 +149,55 @@ public class FireParametersTagStrategy implements CommandStrategy<FireParameters
|
|
|
* @Author: vincent.zhao
|
|
|
* @Date: 2024/6/20 16:01
|
|
|
*/
|
|
|
- private void stopTag(String appId, String platformCode, String tagCode) {
|
|
|
+ private void stopTagCache(String appId, String platformCode, String tagCode) {
|
|
|
log.info("fireParametersTagStrategy 进行stopTag操作 ......");
|
|
|
//停用 接口收到指标停用的变更后, 在线率停止计算,应收到次数以变更时间为节点,计算在线率
|
|
|
//启用 接口收到指标启用的变更后,如果初始化中没有,则新增,开始时间以变更时间为准,计算在线率。 如果初始化中有(一天内多次停用启用,在线率分段计算,只计算启用段内的。
|
|
|
BoundHashOperations parametersTagCache = fireParametersTagStrategy.redisTemplate.boundHashOps(LocalDate.now() + ":" + PerceptionMonitorFireInfo.TABLE_NAME + ":" + appId + ":" + platformCode + ":" + tagCode);
|
|
|
- Integer count = (Integer) parametersTagCache.get("received_count");
|
|
|
|
|
|
- if (Objects.nonNull(count)) {
|
|
|
- parametersTagCache.put("end_time", LocalDateTime.now().toString());
|
|
|
- // 计算每分钟应该接受的数据量
|
|
|
- LocalDateTime beginTime = LocalDateTime.parse(parametersTagCache.get("begin_time").toString(), DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
|
|
- Long shouldAcceptCount = TimeUtil.getTimeDifference(beginTime, LocalDateTime.now());
|
|
|
- parametersTagCache.put("should_accept_count", shouldAcceptCount);
|
|
|
- parametersTagCache.put("enable", 1);
|
|
|
+ // 2024/11/14 删除时候-判断缓存是否存在
|
|
|
+ Boolean hasKey = fireParametersTagStrategy.redisTemplate.hasKey(LocalDate.now() + ":" + PerceptionMonitorFireInfo.TABLE_NAME + ":" + appId + ":" + platformCode + ":" + tagCode);
|
|
|
+ if (Boolean.FALSE.equals(hasKey)){
|
|
|
+ return;
|
|
|
}
|
|
|
+
|
|
|
+ //这个指标信息已经存在 且收到信息的情况
|
|
|
+ parametersTagCache.put("end_time", LocalDateTime.now().toString());
|
|
|
+ // 计算每分钟应该接受的数据量
|
|
|
+ LocalDateTime beginTime = LocalDateTime.parse(parametersTagCache.get("begin_time").toString(), DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
|
|
+ Long shouldAcceptCount = TimeUtil.getTimeDifference(beginTime, LocalDateTime.now());
|
|
|
+ parametersTagCache.put("should_accept_count", shouldAcceptCount);
|
|
|
+ parametersTagCache.put("enable", 1);
|
|
|
}
|
|
|
|
|
|
- private void startTag(String appId, String platformCode, String tagCode) {
|
|
|
+ private void startTagCache(FireParametersTag fireParametersTag) {
|
|
|
log.info("fireParametersTagStrategy 进行startTag操作 ......");
|
|
|
//停用 接口收到指标停用的变更后, 在线率停止计算,应收到次数以变更时间为节点,计算在线率
|
|
|
//启用 接口收到指标启用的变更后,如果初始化中没有,则新增,开始时间以变更时间为准,计算在线率。 如果初始化中有(一天内多次停用启用,在线率分段计算,只计算启用段内的。
|
|
|
- BoundHashOperations parametersTagCache = fireParametersTagStrategy.redisTemplate.boundHashOps(LocalDate.now() + ":" + PerceptionMonitorFireInfo.TABLE_NAME + ":" + appId + ":" + platformCode + ":" + tagCode);
|
|
|
+ BoundHashOperations parametersTagCache = fireParametersTagStrategy.redisTemplate.boundHashOps(LocalDate.now() + ":" +
|
|
|
+ PerceptionMonitorFireInfo.TABLE_NAME + ":" +
|
|
|
+ fireParametersTag.getAppId() + ":" +
|
|
|
+ fireParametersTag.getPlatformCode() + ":" +
|
|
|
+ fireParametersTag.getTagCode());
|
|
|
Integer count = (Integer) parametersTagCache.get("received_count");
|
|
|
|
|
|
- if (Objects.nonNull(count) && count < 1) {
|
|
|
+ if (!Objects.nonNull(count)) {
|
|
|
+ // 初始化
|
|
|
parametersTagCache.put("should_accept_count", 0);
|
|
|
parametersTagCache.put("received_count", 0);
|
|
|
parametersTagCache.put("begin_time", LocalDateTime.now().toString());
|
|
|
parametersTagCache.put("enable", 0);
|
|
|
-
|
|
|
+ parametersTagCache.put("resource", PerceptionMonitorFireInfo.TABLE_NAME);
|
|
|
+ parametersTagCache.put("tagInfo", JSONUtil.toJsonStr(fireParametersTag));
|
|
|
+ parametersTagCache.expire(48, TimeUnit.HOURS);
|
|
|
} else {
|
|
|
- //有的情况下
|
|
|
+ // 恢复缓存
|
|
|
parametersTagCache.put("begin_time", LocalDateTime.now().toString());
|
|
|
parametersTagCache.delete("end_time");
|
|
|
parametersTagCache.put("enable", 0);
|
|
|
+ parametersTagCache.put("resource", PerceptionMonitorFireInfo.TABLE_NAME);
|
|
|
+ parametersTagCache.put("tagInfo", JSONUtil.toJsonStr(fireParametersTag));
|
|
|
+ parametersTagCache.expire(48, TimeUnit.HOURS);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -197,19 +208,17 @@ public class FireParametersTagStrategy implements CommandStrategy<FireParameters
|
|
|
* @Author: vincent.zhao
|
|
|
* @Date: 2024/6/20 16:01
|
|
|
*/
|
|
|
- private void addTag(String appId, String platformCode, String tagCode) {
|
|
|
+ private void addTagCache(String appId, String platformCode, String tagCode) {
|
|
|
log.info("fireParametersTagStrategy 进行addTag操作 ......");
|
|
|
BoundHashOperations parametersTagCache = fireParametersTagStrategy.redisTemplate.boundHashOps(LocalDate.now() + ":" + PerceptionMonitorFireInfo.TABLE_NAME + ":" + appId + ":" + platformCode + ":" + tagCode);
|
|
|
- Integer count = (Integer) parametersTagCache.get("received_count");
|
|
|
|
|
|
- if (Objects.nonNull(count)) {
|
|
|
- parametersTagCache.put("begin_time", LocalDateTime.now().toString());
|
|
|
- parametersTagCache.put("resource", PerceptionMonitorFireInfo.TABLE_NAME);
|
|
|
- parametersTagCache.put("received_count", 0);
|
|
|
- parametersTagCache.put("should_accept_count", 0);
|
|
|
- parametersTagCache.put("enable", 0);
|
|
|
- parametersTagCache.expire(48, TimeUnit.HOURS);
|
|
|
- }
|
|
|
+ parametersTagCache.put("begin_time", LocalDateTime.now().toString());
|
|
|
+ parametersTagCache.put("resource", PerceptionMonitorFireInfo.TABLE_NAME);
|
|
|
+ parametersTagCache.put("received_count", 0);
|
|
|
+ parametersTagCache.put("should_accept_count", 0);
|
|
|
+ parametersTagCache.put("enable", 0);
|
|
|
+ parametersTagCache.expire(48, TimeUnit.HOURS);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -222,10 +231,9 @@ public class FireParametersTagStrategy implements CommandStrategy<FireParameters
|
|
|
public void add(FireParametersTag fireParametersTag) {
|
|
|
log.info("fireParametersTagStrategy 进行add操作 ......");
|
|
|
fireParametersTagStrategy.fireParametersTagService.save(fireParametersTag);
|
|
|
- addTag(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
//指标停用
|
|
|
- if ("1".equals(fireParametersTag.getStatus())) {
|
|
|
- stopTag(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
+ if (!"1".equals(fireParametersTag.getStatus())) {
|
|
|
+ addTagCache(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -245,13 +253,19 @@ public class FireParametersTagStrategy implements CommandStrategy<FireParameters
|
|
|
fireParametersTagStrategy.fireParametersTagService.update(fireParametersTag, Wrappers.lambdaQuery(FireParametersTag.class)
|
|
|
.eq(FireParametersTag::getAppId, fireParametersTag.getAppId())
|
|
|
.eq(FireParametersTag::getOriginId, fireParametersTag.getOriginId()));
|
|
|
- // 指标停用
|
|
|
+ /**
|
|
|
+ * 1.当前状态有效0,新指标状态有效0,不操作缓存;
|
|
|
+ * 2.当前状态有效0,新指标状态无效1,停止缓存;->stopTagCache()
|
|
|
+ * 3.当前状态无效1,新指标状态有效0,恢复缓存;->startTagCache()
|
|
|
+ * 4.当前状态无效1,新指标状态无效1,不操作缓存;
|
|
|
+ */
|
|
|
+ // 指标缓存停用
|
|
|
if ("1".equals(fireParametersTag.getStatus()) && !"1".equals(fireParametersTagTemp.getStatus())) {
|
|
|
- stopTag(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
+ stopTagCache(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
}
|
|
|
- // 启用
|
|
|
+ // 指标缓存启用
|
|
|
if ("0".equals(fireParametersTag.getStatus()) && ("1".equals(fireParametersTagTemp.getStatus()))) {
|
|
|
- startTag(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
+ startTagCache(fireParametersTag);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -269,7 +283,7 @@ public class FireParametersTagStrategy implements CommandStrategy<FireParameters
|
|
|
.eq(FireParametersTag::getOriginId, fireParametersTag.getOriginId()));
|
|
|
|
|
|
//指标停用
|
|
|
- stopTag(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
+ stopTagCache(fireParametersTag.getAppId(), fireParametersTag.getPlatformCode(), fireParametersTag.getTagCode());
|
|
|
}
|
|
|
|
|
|
/**
|