diff --git a/data-processing/data-processing-boot/pom.xml b/data-processing/data-processing-boot/pom.xml index 1ca99a0..f59461a 100644 --- a/data-processing/data-processing-boot/pom.xml +++ b/data-processing/data-processing-boot/pom.xml @@ -79,6 +79,17 @@ 1.0.0 compile + + org.springframework.boot + spring-boot-starter-websocket + 2.7.12 + + + com.njcn.platform + algorithm-api + 1.0.0 + compile + diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java index bbd8f70..e1ea177 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java @@ -1,5 +1,6 @@ package com.njcn.dataProcess.controller;//package com.njcn.message.websocket; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.IdUtil; @@ -10,8 +11,12 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; +import com.njcn.dataProcess.annotation.InsertBean; import com.njcn.dataProcess.annotation.QueryBean; import com.njcn.dataProcess.param.FullRecallMessage; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; +import com.njcn.dataProcess.service.IDataIntegrity; import com.njcn.dataProcess.service.IDataV; import com.njcn.device.biz.commApi.CommTerminalGeneralClient; import com.njcn.device.biz.pojo.dto.LineDevGetDTO; @@ -57,13 +62,14 @@ public class DataRecallController extends BaseController { private final ProduceFeignClient produceFeignClient; private final DeviceFeignClient deviceFeignClient; - + @InsertBean + private IDataIntegrity iDataIntegrityInsert; //页面补招按时间段带小时的全部补招不查datav数据去筛选 @PostMapping("/FullRecall") @OperateInfo(info = LogEnum.BUSINESS_COMMON) @ApiOperation("数据全量补招") @ApiImplicitParam(name = "param", value = "参数", required = true) - public HttpResult> recall(@RequestBody FullRecallMessage param) { + public HttpResult> FullRecall(@RequestBody FullRecallMessage param) { String methodDescribe = getMethodDescribe("recall"); List guidList = new ArrayList<>(); List runMonitorIds = new ArrayList<>(); @@ -80,16 +86,185 @@ public class DataRecallController extends BaseController { }else { runMonitorIds = param.getMonitorId(); } + LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); + lineParam.setLineId(runMonitorIds); + //查看监测点的数据完整性 + lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER)); + lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER)); + List rawData = iDataIntegrityInsert.getRawData(lineParam); + Map>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId))); + runMonitorIds.forEach(temp->{ LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData(); //后续根据不同前置下的测点发送补招密令 DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData(); + + LocalDateTime currentDate = param.getReCallStartTime(); + //循环每一天 + while (!currentDate.isAfter(param.getReCallEndTime())) { + + + LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0); + //查看数据完整性,根据数据完整性进行补招; + //校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招 + Integer recallType = 1; //1全量补 2:差量补 3:不需要补招 + String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER); + if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){ + DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0); + if( dto.getDueTime()!=0){ + int i = (dto.getRealTime()) / dto.getDueTime(); + if( i>=0.98){ + recallType=3; + } else if(i<0.98&&i>0.8){ + recallType=2; + } + } + + } + if(recallType ==3){ + currentDate = tempTime; + continue; + }else if(recallType ==2){ + Integer timeInterval = data.getTimeInterval(); + List localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval); + List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN)); + localDateTimeList.removeAll(data1); + if(!CollectionUtils.isEmpty(localDateTimeList)){ + List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); + //最大时间段为300 + if(timePeriod.size()<300){ + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + recallDTO.setDataType(""); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(timePeriod); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + }else { + List> timePeriods = CollUtil.split(timePeriod, 300); + timePeriods.forEach(period->{ + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + recallDTO.setDataType(""); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(period); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + }); + } + + + } + + }else { + //暂态补招 + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + //不设置dataType暂态稳态全部补招 + recallDTO.setDataType(""); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + if(tempTime.isAfter(param.getReCallEndTime())){ + String eventTime = formatInterval(currentDate,param.getReCallEndTime()); + recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + }else { + String eventTime = formatInterval(currentDate,tempTime); + recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + } + + } + currentDate = tempTime; + + + + } + + + }); + if(!CollectionUtils.isEmpty(recallDTOList)){ + Map> collect = recallDTOList.stream().collect(Collectors.groupingBy(RecallMessage.RecallDTO::getNodeId)); + + collect.forEach((k,v)->{ + RecallMessage message = new RecallMessage(); + message.setNodeId(k); + message.setData(v); + String guid = IdUtil.simpleUUID(); + message.setGuid(guid); + + + + produceFeignClient.recall(message); + guidList.add(guid); + }); + + + } + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, guidList, methodDescribe); + } + + @PostMapping("/fullHourRecall") + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("按小时数据补招") + @ApiImplicitParam(name = "param", value = "参数", required = true) + public HttpResult> fullHourRecall(@RequestBody FullRecallMessage param){ + String methodDescribe = getMethodDescribe("recall"); + List guidList = new ArrayList<>(); + List runMonitorIds = new ArrayList<>(); + List recallDTOList = new ArrayList<>(); +// Duration duration = Duration.between(param.getReCallStartTime(), param.getReCallEndTime()); +// // 获取剩余的分钟数,如果要精确到分钟可以这样做 +// long minutes = duration.toMinutes() ; +// if(minutes>60){ +// throw new BusinessException("全量补招时间区间不能超过一个小时"); +// +// } + if(CollectionUtils.isEmpty(param.getMonitorId())){ + runMonitorIds = commTerminalGeneralClient.getRunMonitorIds().getData(); + }else { + runMonitorIds = param.getMonitorId(); + } + + runMonitorIds.forEach(temp->{ + LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData(); + //后续根据不同前置下的测点发送补招密令 + DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData(); + Integer timeInterval = data.getTimeInterval(); + + List localDateTimeList = generateTimeIntervals(param.getReCallStartTime(),param.getReCallEndTime(), timeInterval); + List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(param.getReCallStartTime().toLocalDate(), DatePattern.NORM_DATE_PATTERN)); + List tempTime = data1.stream().filter(dateTime -> (!dateTime.isAfter(param.getReCallEndTime())) && + (!dateTime.isBefore(param.getReCallStartTime()))).collect(Collectors.toList()); + localDateTimeList.removeAll(tempTime); + if(!CollectionUtils.isEmpty(localDateTimeList)){ + List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); + //最大时间段为300 + if(timePeriod.size()<300){ + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + recallDTO.setDataType("0"); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(timePeriod); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + }else { + List> timePeriods = CollUtil.split(timePeriod, 300); + timePeriods.forEach(period->{ + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + recallDTO.setDataType("0"); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(period); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + }); + } + + + } //暂态补招 RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); //不设置dataType暂态稳态全部补招 - recallDTO.setDataType(""); + recallDTO.setDataType("1"); recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); String eventTime = formatInterval(param.getReCallStartTime(),param.getReCallEndTime()); recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); @@ -117,7 +292,6 @@ public class DataRecallController extends BaseController { } - @PostMapping("/recall") @OperateInfo(info = LogEnum.BUSINESS_COMMON) @ApiOperation("数据补招") @@ -134,6 +308,14 @@ public class DataRecallController extends BaseController { }else { runMonitorIds = param.getMonitorId(); } + LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); + lineParam.setLineId(runMonitorIds); + //查看监测点的数据完整性 + lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER)); + lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER)); + List rawData = iDataIntegrityInsert.getRawData(lineParam); + Map>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId))); + LocalDate currentDate = param.getReCallStartTime(); //循环每一天 while (!currentDate.isAfter(param.getReCallEndTime())) { @@ -143,19 +325,72 @@ public class DataRecallController extends BaseController { //后续根据不同前置下的测点发送补招密令 DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData(); - Integer timeInterval = data.getTimeInterval(); - List localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval); - List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN)); - localDateTimeList.removeAll(data1); - if(!CollectionUtils.isEmpty(localDateTimeList)){ - List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); + //查看数据完整性,根据数据完整性进行补招; + //校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招 + Integer recallType = 1; //1全量补 2:差量补 3:不需要补招 + String curDateString = LocalDateTimeUtil.format(finalCurrentDate.atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER); + if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){ + DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0); + if( dto.getDueTime()!=0){ + int i = (dto.getRealTime()) / dto.getDueTime(); + if( i>=0.98){ + recallType=3; + } else if(i<0.98&&i>0.8){ + recallType=2; + } + } + + } + + if(recallType ==3){ + return; + }else if(recallType ==2){ + Integer timeInterval = data.getTimeInterval(); + List localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval); + List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN)); + localDateTimeList.removeAll(data1); + if(!CollectionUtils.isEmpty(localDateTimeList)){ + List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); + //最大时间段为300 + if(timePeriod.size()<300){ + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + recallDTO.setDataType("0"); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(timePeriod); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + }else { + List> timePeriods = CollUtil.split(timePeriod, 300); + timePeriods.forEach(period->{ + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + recallDTO.setDataType("0"); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(period); + recallDTO.setNodeId(data2.getNodeId()); + recallDTOList.add(recallDTO); + }); + } + + + } + + }else { + //暂态补招 RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + //不设置dataType暂态稳态全部补招 recallDTO.setDataType("0"); recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); - recallDTO.setTimeInterval(timePeriod); + + String eventTime = formatInterval(finalCurrentDate.atTime(0,0,0),finalCurrentDate.atTime(23,59,0)); + recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); recallDTO.setNodeId(data2.getNodeId()); recallDTOList.add(recallDTO); + + } + + + //暂态补招 RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO(); recallDTO2.setDataType("1"); @@ -177,10 +412,10 @@ public class DataRecallController extends BaseController { RecallMessage message = new RecallMessage(); message.setNodeId(k); message.setData(v); - String guid = IdUtil.simpleUUID(); - message.setGuid(guid); + String guid = IdUtil.simpleUUID(); + message.setGuid(guid); produceFeignClient.recall(message); - guidList.add(guid); + guidList.add(guid); }); @@ -225,6 +460,22 @@ public class DataRecallController extends BaseController { return dateTimeList; } + public List generateTimeIntervals(LocalDateTime startDateTime,LocalDateTime endDateTime , int intervalMinutes) { + List dateTimeList = new ArrayList<>(); + + + + // Generate LocalDateTime list with the given interval + LocalDateTime currentDateTime = startDateTime; + while (!currentDateTime.isAfter(endDateTime)) { + dateTimeList.add(currentDateTime); + currentDateTime = currentDateTime.plusMinutes(intervalMinutes); + } + + return dateTimeList; + } + + public static List mergeTimeIntervals(List times, int intervalMinutes) { List mergedIntervals = new ArrayList<>(); if (times == null || times.isEmpty()) { @@ -259,10 +510,18 @@ public class DataRecallController extends BaseController { } public static void main(String[] args) { - // 创建两个LocalDateTime对象 - LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0); // 例如:2023年1月1日 10:00 - LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 2, 15, 30); // 例如:2023年1月1日 15:30 + LocalDate localDate1 = LocalDate.now(); + LocalDate localDate2 = LocalDate.now(); + while (!localDate1.isAfter(localDate2)) { + LocalDate finalCurrentDate = localDate1; + System.out.println(finalCurrentDate); + localDate1 = localDate1.plusDays(1); + } + + // 创建两个LocalDateTime对象 + LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0,11); // 例如:2023年1月1日 10:00 + LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 1, 10, 30,22); // 例如:2023年1月1日 15:30 // 使用Duration计算两个时间之间的差异 Duration duration = Duration.between(dateTime1, dateTime2); diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java new file mode 100644 index 0000000..b978dde --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java @@ -0,0 +1,493 @@ +package com.njcn.dataProcess.websocket; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.json.JSONUtil; +import com.alibaba.nacos.shaded.com.google.gson.JsonObject; +import com.njcn.algorithm.pojo.bo.BaseParam; +import com.njcn.algorithm.pojo.liteflow.LiteFlowAlgorithmFeignClient; +import com.njcn.dataProcess.annotation.InsertBean; +import com.njcn.dataProcess.annotation.QueryBean; +import com.njcn.dataProcess.param.FullRecallMessage; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; +import com.njcn.dataProcess.service.IDataIntegrity; +import com.njcn.dataProcess.service.IDataV; +import com.njcn.device.biz.commApi.CommTerminalGeneralClient; +import com.njcn.device.biz.pojo.dto.LineDevGetDTO; +import com.njcn.device.pq.api.DeviceFeignClient; +import com.njcn.device.pq.pojo.dto.DeviceDTO; +import com.njcn.message.api.ProduceFeignClient; +import com.njcn.message.constant.RedisKeyPrefix; +import com.njcn.message.message.RecallMessage; +import com.njcn.message.messagedto.TopicReplyDTO; +import com.njcn.redis.utils.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Description: + * Date: 2024/12/13 15:11【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Slf4j +@Component +@ServerEndpoint(value ="/api/recell/{userId}") +public class RecallWebSocketServer { + @QueryBean + private static IDataV dataVQuery; + + @Autowired + @Qualifier("InfluxdbDataVImpl") + public void setDataVQuery( IDataV dataVQuery) { + RecallWebSocketServer.dataVQuery = dataVQuery; + } + private static CommTerminalGeneralClient commTerminalGeneralClient; + + @Autowired + public void setCommTerminalGeneralClient(CommTerminalGeneralClient commTerminalGeneralClient) { + RecallWebSocketServer.commTerminalGeneralClient = commTerminalGeneralClient; + } + private static ProduceFeignClient produceFeignClient; + + @Autowired + public void setProduceFeignClient(ProduceFeignClient produceFeignClient) { + RecallWebSocketServer.produceFeignClient = produceFeignClient; + } + + @InsertBean + private static IDataIntegrity iDataIntegrityInsert; + + @Autowired + @Qualifier("RelationDataIntegrityImpl") + public void setiDataIntegrityInsert(IDataIntegrity iDataIntegrityInsert) { + RecallWebSocketServer.iDataIntegrityInsert = iDataIntegrityInsert; + } + + private static LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient; + + @Autowired + public void setLiteFlowAlgorithmFeignClient(LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient) { + RecallWebSocketServer.liteFlowAlgorithmFeignClient = liteFlowAlgorithmFeignClient; + } + + private static RedisUtil redisUtil; + + + + @Autowired + public void setRedisUtil( RedisUtil redisUtil) { + RecallWebSocketServer.redisUtil = redisUtil; + } + + + private static DeviceFeignClient deviceFeignClient; + + // 注入的时候,给类的 service 注入 + @Autowired + public void setLineFeignClient(DeviceFeignClient deviceFeignClient) { + RecallWebSocketServer.deviceFeignClient = deviceFeignClient; + } + /** + * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 + */ + private static int onlineCount = 0; + /** + * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。 + */ + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + /** + * 与某个客户端的连接会话,需要通过它来给客户端发送数据 + */ + private Session session; + /** + * 接收userId + */ + private String userId = ""; + + /** + * 连接建立成 + * 功调用的方法 + */ + @OnOpen + public void onOpen(Session session, @PathParam("userId") String userId) { + //lineId 是 userid+","+lineId+","+Devid + this.session = session; + this.userId = userId; + if (webSocketMap.containsKey(userId)) { + webSocketMap.remove(userId); + //加入set中 + webSocketMap.put(userId, this); + } else { + //加入set中 + webSocketMap.put(userId, this); + //在线数加1 + addOnlineCount(); + } + sendMessage("连接成功"); + + + } + + /** + * 连接关闭 + * 调用的方法 + */ + @OnClose + public void onClose() { + if (webSocketMap.containsKey(userId)) { + webSocketMap.remove(userId); + //从set中删除 + subOnlineCount(); + } + log.info("用户退出:" + userId + ",当前在线监测点数为:" + getOnlineCount()); + } + + /** + * 收到客户端消 + * 息后调用的方法 + * + * @param message 客户端发送过来的消息 + **/ + @OnMessage + public void onMessage(String message, Session session) { + //会每30s发送请求1次 + log.info("监测点消息:" + userId + ",报文:" + message); + FullRecallMessage param = JSONUtil.toBean(message,FullRecallMessage.class,true); + if(Objects.isNull(message)){ + sendInfo("参数有误"); + }else { + List runMonitorIds = param.getMonitorId(); + LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); + lineParam.setLineId(runMonitorIds); + //查看监测点的数据完整性 + lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER)); + lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER)); + List rawData = iDataIntegrityInsert.getRawData(lineParam); + Map>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId))); + List bsParmList = new ArrayList<>(); + runMonitorIds.forEach(temp->{ + LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData(); + //后续根据不同前置下的测点发送补招密令 + DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData(); + + + + LocalDateTime currentDate = param.getReCallStartTime(); + //循环每一天 + while (!currentDate.isAfter(param.getReCallEndTime())) { + + String key =""; + LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0); + //查看数据完整性,根据数据完整性进行补招; + //校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招 + Integer recallType = 1; //1全量补 2:差量补 3:不需要补招 + String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER); + if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){ + DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0); + if( dto.getDueTime()!=0){ + int i = (dto.getRealTime()) / dto.getDueTime(); + if( i>=0.98){ + recallType=3; + } else if(i<0.98&&i>0.8){ + recallType=2; + } + } + + } + if(recallType ==3){ + currentDate = tempTime; + sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招"); + continue; + }else if(recallType ==2){ + Integer timeInterval = data.getTimeInterval(); + List localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval); + List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN)); + localDateTimeList.removeAll(data1); + if(!CollectionUtils.isEmpty(localDateTimeList)){ + List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + // + recallDTO.setDataType(""); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(timePeriod); + recallDTO.setNodeId(data2.getNodeId()); + RecallMessage recallMessage = new RecallMessage(); + recallMessage.setNodeId(data2.getNodeId()); + recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList())); + String guid = IdUtil.simpleUUID(); + recallMessage.setGuid(guid); + + + + produceFeignClient.recall(recallMessage); + sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招"); + boolean flag =true; + LocalDateTime beginTaskTime = LocalDateTime.now(); + while (flag){ + if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){ + key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + + String jsonString =redisUtil.getStringByKey(key); + if(Objects.nonNull(jsonString)){ + TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); + + flag =false; + sendInfo(bean.getResult()); + BaseParam baseParam = new BaseParam(); + baseParam.setFullChain(false); + baseParam.setRepair(false); + baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); + baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); + bsParmList.add(baseParam); + } + }else { + flag =false; + sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时"); + BaseParam baseParam = new BaseParam(); + baseParam.setFullChain(false); + baseParam.setRepair(false); + baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); + baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); + bsParmList.add(baseParam); + } + + } + + } + + }else { + //暂态补招 + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + //不设置dataType暂态稳态全部补招 + recallDTO.setDataType(""); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + if(tempTime.isAfter(param.getReCallEndTime())){ + String eventTime = formatInterval(currentDate,param.getReCallEndTime()); + recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); + recallDTO.setNodeId(data2.getNodeId()); + }else { + String eventTime = formatInterval(currentDate,tempTime); + recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList())); + recallDTO.setNodeId(data2.getNodeId()); + } + RecallMessage recallMessage = new RecallMessage(); + recallMessage.setNodeId(data2.getNodeId()); + recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList())); + String guid = IdUtil.simpleUUID(); + recallMessage.setGuid(guid); + produceFeignClient.recall(recallMessage); + sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招"); + + boolean flag =true; + LocalDateTime beginTaskTime = LocalDateTime.now(); + while (flag){ + if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){ + key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + + String jsonString =redisUtil.getStringByKey(key); + if(Objects.nonNull(jsonString)){ + TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); + + flag =false; + sendInfo(bean.getResult()); + BaseParam baseParam = new BaseParam(); + baseParam.setFullChain(false); + baseParam.setRepair(false); + baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); + baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); + bsParmList.add(baseParam); + } + }else { + flag =false; + sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时"); + BaseParam baseParam = new BaseParam(); + baseParam.setFullChain(false); + baseParam.setRepair(false); + baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER)); + baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet())); + baseParam.setIdList(Stream.of(temp).collect(Collectors.toList())); + bsParmList.add(baseParam); + } + + } + + + } + currentDate = tempTime; + + } + + + }); + if(!CollectionUtils.isEmpty(bsParmList)){ + sendInfo("开始执行数据完整性算法"); + bsParmList.forEach(temp->{ + liteFlowAlgorithmFeignClient.measurementPointExecutor(temp); + }); + sendInfo("执行完成"); + + } + + } + sendInfo("补招任务结束"); + } + public List generateTimeIntervals(LocalDate date, int intervalMinutes) { + List dateTimeList = new ArrayList<>(); + + // Create the starting LocalDateTime + LocalDateTime startDateTime = LocalDateTime.of(date, LocalTime.MIDNIGHT); + + // Create the ending LocalDateTime + LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX); + + // Generate LocalDateTime list with the given interval + LocalDateTime currentDateTime = startDateTime; + while (!currentDateTime.isAfter(endDateTime)) { + dateTimeList.add(currentDateTime); + currentDateTime = currentDateTime.plusMinutes(intervalMinutes); + } + + return dateTimeList; + } + + public static List mergeTimeIntervals(List times, int intervalMinutes) { + List mergedIntervals = new ArrayList<>(); + if (times == null || times.isEmpty()) { + return mergedIntervals; + } + + // Sort the list to ensure the times are in order + times.sort(LocalDateTime::compareTo); + + LocalDateTime start = times.get(0); + LocalDateTime end = start; + + for (int i = 1; i < times.size(); i++) { + LocalDateTime current = times.get(i); + if (current.isAfter(end.plusMinutes(intervalMinutes))) { + // If the current time is more than interval minutes after the end, close the current interval + mergedIntervals.add(formatInterval(start, end)); + start = current; // Start a new interval + } + end = current; // Update the end of the current interval + } + + // Add the last interval + mergedIntervals.add(formatInterval(start, end)); + + return mergedIntervals; + } + + private static String formatInterval(LocalDateTime start, LocalDateTime end) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + return start.format(formatter) + "~" + end.format(formatter); + } + + /** + * @param session + * @param error + */ + @OnError + public void onError(Session session, Throwable error) { + + log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); + error.printStackTrace(); + } + + /** + * 实现服务 + * 器主动推送 + */ + public void sendMessage(String message) { + try { + this.session.getBasicRemote().sendText(message); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 发送自定 + * 义消息 + **/ + public static void sendInfo(String message) { + + + webSocketMap.forEach((k,v)->{ + webSocketMap.get(k).sendMessage(message); + + }); + + } + + /** + * 获得此时的 + * 在线监测点 + * + * @return + */ + public static synchronized int getOnlineCount() { + return onlineCount; + } + + /** + * 在线监测点 + * 数加1 + */ + public static synchronized void addOnlineCount() { + RecallWebSocketServer.onlineCount++; + } + + /** + * 在线监测点 + * 数减1 + */ + public static synchronized void subOnlineCount() { + RecallWebSocketServer.onlineCount--; + } + +// /** +// * 过滤所有键包含指定字符串的条目 +// * @param map 原始的Map +// * @param substring 要检查的子字符串 +// * @return 过滤的Map +// */ +// public static Map filterMapByKey(ConcurrentHashMap map, String substring) { +// Map result = new HashMap<>(); +// for (Map.Entry entry : map.entrySet()) { +// if (entry.getKey().contains(substring)) { +// result.put(entry.getKey(), entry.getValue().toString()); +// } +// } +// return result; +// } + +} \ No newline at end of file diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/WebSocketConfig.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/WebSocketConfig.java new file mode 100644 index 0000000..ad087e7 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/WebSocketConfig.java @@ -0,0 +1,41 @@ +package com.njcn.dataProcess.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; +import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean; + +/** + * Description: + * Date: 2024/12/13 15:09【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Configuration +public class WebSocketConfig { + + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } + + /** + * 通信文本消息和二进制缓存区大小 + * 避免对接 第三方 报文过大时,Websocket 1009 错误 + * + * @return + */ + + @Bean + public ServletServerContainerFactoryBean createWebSocketContainer() { + ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); + // 在此处设置bufferSize + container.setMaxTextMessageBufferSize(10240000); + container.setMaxBinaryMessageBufferSize(10240000); + container.setMaxSessionIdleTimeout(15 * 60000L); + return container; + } + + +} diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java b/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java index a6646d5..ff56082 100644 --- a/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java +++ b/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java @@ -1,9 +1,14 @@ package com.njcn.message.messagedto; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.njcn.middle.rocket.domain.BaseMessage; import lombok.Data; import java.io.Serializable; +import java.time.LocalDate; +import java.time.LocalDateTime; /** * Description: @@ -14,11 +19,22 @@ import java.io.Serializable; */ @Data public class TopicReplyDTO extends BaseMessage implements Serializable { - //消息id + //消息id guid="12345"是补招回复结果 private String guid; private String step; - + //guid="12345"是补招回复结果 private String result; + private String lineIndex; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonDeserialize(using = LocalDateTimeDeserializer.class) + private LocalDateTime recallStartDate; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonDeserialize(using = LocalDateTimeDeserializer.class) + private LocalDateTime recallEndDate; + + + + } diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java index e6df7ce..4d9d033 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java @@ -64,19 +64,19 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler