diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java index b4653bb..86c1c38 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java @@ -227,22 +227,47 @@ public class RecallWebSocketServer { } } - if(recallType ==3){ - RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招"); + // 确定补招时间段 + LocalDateTime toDayBeginTime,toDayEndTime; + if(tempTime.isAfter(param.getReCallEndTime())){ + toDayBeginTime =currentDate; + toDayEndTime = param.getReCallEndTime(); + + }else { + toDayBeginTime =currentDate; + toDayEndTime = tempTime; + + } + RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"开始补招"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO)); + //暂态补招 + RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO(); + recallDTO2.setDataType("1"); + recallDTO2.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + String tempTimeInterval = formatInterval(toDayBeginTime, toDayEndTime); + recallDTO2.setTimeInterval(Stream.of(tempTimeInterval).collect(Collectors.toList())); + recallDTO2.setNodeId(data2.getNodeId()); + + RecallMessage eventMessage = new RecallMessage(); + eventMessage.setNodeId(data2.getNodeId()); + eventMessage.setData(Stream.of(recallDTO2).collect(Collectors.toList())); + String guid1 = IdUtil.simpleUUID(); + eventMessage.setGuid(guid1); + produceFeignClient.recall(eventMessage); + + if(recallType ==3){ - sendInfo(JSONObject.toJSONString(recallReplyDTO)); - currentDate = tempTime; - continue; }else if(recallType ==2){ Integer timeInterval = data.getTimeInterval(); - List localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval); + List localDateTimeList = generateTimeIntervals( toDayBeginTime,toDayBeginTime, timeInterval); List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN)); localDateTimeList.removeAll(data1); if(!CollectionUtils.isEmpty(localDateTimeList)){ List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); // - recallDTO.setDataType(""); + recallDTO.setDataType("0"); recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); recallDTO.setTimeInterval(timePeriod); recallDTO.setNodeId(data2.getNodeId()); @@ -252,51 +277,46 @@ public class RecallWebSocketServer { String guid = IdUtil.simpleUUID(); recallMessage.setGuid(guid); - - produceFeignClient.recall(recallMessage); - RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招"); - - sendInfo(JSONObject.toJSONString(recallReplyDTO)); - boolean flag =true; - LocalDateTime beginTaskTime = LocalDateTime.now(); - while (flag){ - if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){ - key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); - - String jsonString =redisUtil.getStringByKey(key); - if(Objects.nonNull(jsonString)){ - TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); - redisUtil.delete(key); - flag =false; - RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult()); - - sendInfo(JSONObject.toJSONString(recallReplyDTO2)); - - BaseParam baseParam = new BaseParam(); - baseParam.setFullChain(false); - baseParam.setRepair(false); - baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); - baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); - baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); - bsParmList.add(baseParam); - } - }else { - flag =false; - RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时"); - - sendInfo(JSONObject.toJSONString(recallReplyDTO2)); - - BaseParam baseParam = new BaseParam(); - baseParam.setFullChain(false); - baseParam.setRepair(false); - baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); - baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); - baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); - bsParmList.add(baseParam); - } - - } +// boolean flag =true; +// LocalDateTime beginTaskTime = LocalDateTime.now(); +// while (flag){ +// if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){ +// key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); +// +// String jsonString =redisUtil.getStringByKey(key); +// if(Objects.nonNull(jsonString)){ +// TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); +// redisUtil.delete(key); +// flag =false; +// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult()); +// +// sendInfo(JSONObject.toJSONString(recallReplyDTO2)); +// +// BaseParam baseParam = new BaseParam(); +// baseParam.setFullChain(false); +// baseParam.setRepair(false); +// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); +// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); +// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); +// bsParmList.add(baseParam); +// } +// }else { +// flag =false; +// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时"); +// +// sendInfo(JSONObject.toJSONString(recallReplyDTO2)); +// +// BaseParam baseParam = new BaseParam(); +// baseParam.setFullChain(false); +// baseParam.setRepair(false); +// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); +// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); +// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); +// bsParmList.add(baseParam); +// } +// +// } } @@ -304,56 +324,78 @@ public class RecallWebSocketServer { //暂态补招 RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); //不设置dataType暂态稳态全部补招 - recallDTO.setDataType(""); + recallDTO.setDataType("0"); recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); - if(tempTime.isAfter(param.getReCallEndTime())){ - String eventTime = formatInterval(currentDate,param.getReCallEndTime()); - recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); - recallDTO.setNodeId(data2.getNodeId()); - }else { - String eventTime = formatInterval(currentDate,tempTime); - recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); - recallDTO.setNodeId(data2.getNodeId()); - } + + String eventTime = formatInterval(toDayBeginTime,toDayEndTime); + recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); + recallDTO.setNodeId(data2.getNodeId()); + RecallMessage recallMessage = new RecallMessage(); recallMessage.setNodeId(data2.getNodeId()); recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList())); String guid = IdUtil.simpleUUID(); recallMessage.setGuid(guid); produceFeignClient.recall(recallMessage); - RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招"); +// +// +// boolean flag =true; +// LocalDateTime beginTaskTime = LocalDateTime.now(); +// while (flag){ +// if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){ +// key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); +// +// String jsonString =redisUtil.getStringByKey(key); +// if(Objects.nonNull(jsonString)){ +// TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); +// redisUtil.delete(key); +// +// flag =false; +// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult()); +// +// sendInfo(JSONObject.toJSONString(recallReplyDTO2)); +// +// BaseParam baseParam = new BaseParam(); +// baseParam.setFullChain(false); +// baseParam.setRepair(false); +// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); +// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); +// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); +// bsParmList.add(baseParam); +// } +// }else { +// flag =false; +// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时"); +// +// sendInfo(JSONObject.toJSONString(recallReplyDTO2)); +// BaseParam baseParam = new BaseParam(); +// baseParam.setFullChain(false); +// baseParam.setRepair(false); +// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); +// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); +// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); +// bsParmList.add(baseParam); +// } +// +// } - sendInfo(JSONObject.toJSONString(recallReplyDTO)); - boolean flag =true; - LocalDateTime beginTaskTime = LocalDateTime.now(); - while (flag){ - if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){ - key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + } + boolean flag =true; + LocalDateTime beginTaskTime = LocalDateTime.now(); + while (flag){ + if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=10){ + key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); - String jsonString =redisUtil.getStringByKey(key); - if(Objects.nonNull(jsonString)){ - TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); - redisUtil.delete(key); - - flag =false; - RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult()); - - sendInfo(JSONObject.toJSONString(recallReplyDTO2)); - - BaseParam baseParam = new BaseParam(); - baseParam.setFullChain(false); - baseParam.setRepair(false); - baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); - baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); - baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); - bsParmList.add(baseParam); - } - }else { + String jsonString =redisUtil.getStringByKey(key); + if(Objects.nonNull(jsonString)){ + TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); + redisUtil.delete(key); flag =false; - RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时"); + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult()); sendInfo(JSONObject.toJSONString(recallReplyDTO2)); + BaseParam baseParam = new BaseParam(); baseParam.setFullChain(false); baseParam.setRepair(false); @@ -362,10 +404,21 @@ public class RecallWebSocketServer { baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); bsParmList.add(baseParam); } + }else { + flag =false; + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时"); + sendInfo(JSONObject.toJSONString(recallReplyDTO2)); + + BaseParam baseParam = new BaseParam(); + baseParam.setFullChain(false); + baseParam.setRepair(false); + baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); + baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); + bsParmList.add(baseParam); } - } currentDate = tempTime; @@ -400,6 +453,20 @@ public class RecallWebSocketServer { // Create the ending LocalDateTime LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX); + // Generate LocalDateTime list with the given interval + LocalDateTime currentDateTime = startDateTime; + while (!currentDateTime.isAfter(endDateTime)) { + dateTimeList.add(currentDateTime); + currentDateTime = currentDateTime.plusMinutes(intervalMinutes); + } + + return dateTimeList; + } + public List generateTimeIntervals(LocalDateTime startDateTime,LocalDateTime endDateTime , int intervalMinutes) { + List dateTimeList = new ArrayList<>(); + + + // Generate LocalDateTime list with the given interval LocalDateTime currentDateTime = startDateTime; while (!currentDateTime.isAfter(endDateTime)) { diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java index 2eaecf2..7e3715b 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java @@ -84,7 +84,7 @@ public class TopicReplyConsumer extends EnhanceConsumerMessageHandler