补招逻辑修改,手动补招和前端采用websokcet通讯

This commit is contained in:
hzj
2025-08-28 13:49:45 +08:00
parent 9261adb79a
commit 11ee847cac
2 changed files with 157 additions and 90 deletions

View File

@@ -227,22 +227,47 @@ public class RecallWebSocketServer {
} }
} }
if(recallType ==3){ // 确定补招时间段
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招"); LocalDateTime toDayBeginTime,toDayEndTime;
if(tempTime.isAfter(param.getReCallEndTime())){
toDayBeginTime =currentDate;
toDayEndTime = param.getReCallEndTime();
}else {
toDayBeginTime =currentDate;
toDayEndTime = tempTime;
}
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"开始补招");
sendInfo(JSONObject.toJSONString(recallReplyDTO)); sendInfo(JSONObject.toJSONString(recallReplyDTO));
currentDate = tempTime; //暂态补招
continue; RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO();
recallDTO2.setDataType("1");
recallDTO2.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
String tempTimeInterval = formatInterval(toDayBeginTime, toDayEndTime);
recallDTO2.setTimeInterval(Stream.of(tempTimeInterval).collect(Collectors.toList()));
recallDTO2.setNodeId(data2.getNodeId());
RecallMessage eventMessage = new RecallMessage();
eventMessage.setNodeId(data2.getNodeId());
eventMessage.setData(Stream.of(recallDTO2).collect(Collectors.toList()));
String guid1 = IdUtil.simpleUUID();
eventMessage.setGuid(guid1);
produceFeignClient.recall(eventMessage);
if(recallType ==3){
}else if(recallType ==2){ }else if(recallType ==2){
Integer timeInterval = data.getTimeInterval(); Integer timeInterval = data.getTimeInterval();
List<LocalDateTime> localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval); List<LocalDateTime> localDateTimeList = generateTimeIntervals( toDayBeginTime,toDayBeginTime, timeInterval);
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN)); List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN));
localDateTimeList.removeAll(data1); localDateTimeList.removeAll(data1);
if(!CollectionUtils.isEmpty(localDateTimeList)){ if(!CollectionUtils.isEmpty(localDateTimeList)){
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
// //
recallDTO.setDataType(""); recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(timePeriod); recallDTO.setTimeInterval(timePeriod);
recallDTO.setNodeId(data2.getNodeId()); recallDTO.setNodeId(data2.getNodeId());
@@ -252,16 +277,114 @@ public class RecallWebSocketServer {
String guid = IdUtil.simpleUUID(); String guid = IdUtil.simpleUUID();
recallMessage.setGuid(guid); recallMessage.setGuid(guid);
produceFeignClient.recall(recallMessage); produceFeignClient.recall(recallMessage);
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招"); // boolean flag =true;
// LocalDateTime beginTaskTime = LocalDateTime.now();
// while (flag){
// if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
// key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
//
// String jsonString =redisUtil.getStringByKey(key);
// if(Objects.nonNull(jsonString)){
// TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
// redisUtil.delete(key);
// flag =false;
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
//
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
//
// BaseParam baseParam = new BaseParam();
// baseParam.setFullChain(false);
// baseParam.setRepair(false);
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
// bsParmList.add(baseParam);
// }
// }else {
// flag =false;
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时");
//
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
//
// BaseParam baseParam = new BaseParam();
// baseParam.setFullChain(false);
// baseParam.setRepair(false);
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
// bsParmList.add(baseParam);
// }
//
// }
sendInfo(JSONObject.toJSONString(recallReplyDTO)); }
}else {
//暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//不设置dataType暂态稳态全部补招
recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
String eventTime = formatInterval(toDayBeginTime,toDayEndTime);
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
RecallMessage recallMessage = new RecallMessage();
recallMessage.setNodeId(data2.getNodeId());
recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
String guid = IdUtil.simpleUUID();
recallMessage.setGuid(guid);
produceFeignClient.recall(recallMessage);
//
//
// boolean flag =true;
// LocalDateTime beginTaskTime = LocalDateTime.now();
// while (flag){
// if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
// key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
//
// String jsonString =redisUtil.getStringByKey(key);
// if(Objects.nonNull(jsonString)){
// TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
// redisUtil.delete(key);
//
// flag =false;
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
//
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
//
// BaseParam baseParam = new BaseParam();
// baseParam.setFullChain(false);
// baseParam.setRepair(false);
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
// bsParmList.add(baseParam);
// }
// }else {
// flag =false;
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时");
//
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
// BaseParam baseParam = new BaseParam();
// baseParam.setFullChain(false);
// baseParam.setRepair(false);
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
// bsParmList.add(baseParam);
// }
//
// }
}
boolean flag =true; boolean flag =true;
LocalDateTime beginTaskTime = LocalDateTime.now(); LocalDateTime beginTaskTime = LocalDateTime.now();
while (flag){ while (flag){
if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){ if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=10){
key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
String jsonString =redisUtil.getStringByKey(key); String jsonString =redisUtil.getStringByKey(key);
@@ -297,76 +420,6 @@ public class RecallWebSocketServer {
} }
} }
}
}else {
//暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//不设置dataType暂态稳态全部补招
recallDTO.setDataType("");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
if(tempTime.isAfter(param.getReCallEndTime())){
String eventTime = formatInterval(currentDate,param.getReCallEndTime());
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
}else {
String eventTime = formatInterval(currentDate,tempTime);
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
}
RecallMessage recallMessage = new RecallMessage();
recallMessage.setNodeId(data2.getNodeId());
recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
String guid = IdUtil.simpleUUID();
recallMessage.setGuid(guid);
produceFeignClient.recall(recallMessage);
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招");
sendInfo(JSONObject.toJSONString(recallReplyDTO));
boolean flag =true;
LocalDateTime beginTaskTime = LocalDateTime.now();
while (flag){
if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
String jsonString =redisUtil.getStringByKey(key);
if(Objects.nonNull(jsonString)){
TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
redisUtil.delete(key);
flag =false;
RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false);
baseParam.setRepair(false);
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
bsParmList.add(baseParam);
}
}else {
flag =false;
RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时");
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false);
baseParam.setRepair(false);
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
bsParmList.add(baseParam);
}
}
}
currentDate = tempTime; currentDate = tempTime;
} }
@@ -400,6 +453,20 @@ public class RecallWebSocketServer {
// Create the ending LocalDateTime // Create the ending LocalDateTime
LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX); LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX);
// Generate LocalDateTime list with the given interval
LocalDateTime currentDateTime = startDateTime;
while (!currentDateTime.isAfter(endDateTime)) {
dateTimeList.add(currentDateTime);
currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
}
return dateTimeList;
}
public List<LocalDateTime> generateTimeIntervals(LocalDateTime startDateTime,LocalDateTime endDateTime , int intervalMinutes) {
List<LocalDateTime> dateTimeList = new ArrayList<>();
// Generate LocalDateTime list with the given interval // Generate LocalDateTime list with the given interval
LocalDateTime currentDateTime = startDateTime; LocalDateTime currentDateTime = startDateTime;
while (!currentDateTime.isAfter(endDateTime)) { while (!currentDateTime.isAfter(endDateTime)) {

View File

@@ -84,7 +84,7 @@ public class TopicReplyConsumer extends EnhanceConsumerMessageHandler<TopicReply
protected void handleMessage(TopicReplyDTO message) { protected void handleMessage(TopicReplyDTO message) {
//“12345”补招回复 //“12345”补招回复
if(Objects.equals(message.getGuid(),"12345")){ if(Objects.equals(message.getGuid(),"12345")){
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getLineIndex().concat(message.getRecallStartDate().toLocalDate().format(DatePattern.NORM_DATE_FORMATTER))),JSONObject.toJSONString(message),60*60L); redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getLineIndex().concat(message.getRecallStartDate().toLocalDate().format(DatePattern.NORM_DATE_FORMATTER))),JSONObject.toJSONString(message),1*60L);
}else { }else {
//业务处理 //业务处理
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),JSONObject.toJSONString(message),60*60L); redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),JSONObject.toJSONString(message),60*60L);