From f519fd3e568b899e2d082e9e3180e13249b2e4c1 Mon Sep 17 00:00:00 2001
From: hzj <826100833@qq.com>
Date: Fri, 22 Aug 2025 15:55:05 +0800
Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E6=8B=9B=E9=80=BB=E8=BE=91=E4=BF=AE?=
=?UTF-8?q?=E6=94=B9=EF=BC=8C=E6=89=8B=E5=8A=A8=E8=A1=A5=E6=8B=9B=E5=92=8C?=
=?UTF-8?q?=E5=89=8D=E7=AB=AF=E9=87=87=E7=94=A8websokcet=E9=80=9A=E8=AE=AF?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
data-processing/data-processing-boot/pom.xml | 11 +
.../controller/DataRecallController.java | 293 ++++++++++-
.../websocket/RecallWebSocketServer.java | 493 ++++++++++++++++++
.../websocket/WebSocketConfig.java | 41 ++
.../message/messagedto/TopicReplyDTO.java | 20 +-
.../consumer/DeviceRunFlagDataConsumer.java | 14 +-
.../consumer/RealTimeDataConsumer.java | 2 +-
.../message/consumer/TopicReplyConsumer.java | 14 +-
8 files changed, 857 insertions(+), 31 deletions(-)
create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java
create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/WebSocketConfig.java
diff --git a/data-processing/data-processing-boot/pom.xml b/data-processing/data-processing-boot/pom.xml
index 1ca99a0..f59461a 100644
--- a/data-processing/data-processing-boot/pom.xml
+++ b/data-processing/data-processing-boot/pom.xml
@@ -79,6 +79,17 @@
1.0.0
compile
+
+ org.springframework.boot
+ spring-boot-starter-websocket
+ 2.7.12
+
+
+ com.njcn.platform
+ algorithm-api
+ 1.0.0
+ compile
+
diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java
index bbd8f70..e1ea177 100644
--- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java
+++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java
@@ -1,5 +1,6 @@
package com.njcn.dataProcess.controller;//package com.njcn.message.websocket;
+import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
@@ -10,8 +11,12 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
+import com.njcn.dataProcess.annotation.InsertBean;
import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.param.FullRecallMessage;
+import com.njcn.dataProcess.param.LineCountEvaluateParam;
+import com.njcn.dataProcess.pojo.dto.DataIntegrityDto;
+import com.njcn.dataProcess.service.IDataIntegrity;
import com.njcn.dataProcess.service.IDataV;
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
@@ -57,13 +62,14 @@ public class DataRecallController extends BaseController {
private final ProduceFeignClient produceFeignClient;
private final DeviceFeignClient deviceFeignClient;
-
+ @InsertBean
+ private IDataIntegrity iDataIntegrityInsert;
//页面补招按时间段带小时的全部补招不查datav数据去筛选
@PostMapping("/FullRecall")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("数据全量补招")
@ApiImplicitParam(name = "param", value = "参数", required = true)
- public HttpResult> recall(@RequestBody FullRecallMessage param) {
+ public HttpResult> FullRecall(@RequestBody FullRecallMessage param) {
String methodDescribe = getMethodDescribe("recall");
List guidList = new ArrayList<>();
List runMonitorIds = new ArrayList<>();
@@ -80,16 +86,185 @@ public class DataRecallController extends BaseController {
}else {
runMonitorIds = param.getMonitorId();
}
+ LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
+ lineParam.setLineId(runMonitorIds);
+ //查看监测点的数据完整性
+ lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
+ lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
+ List rawData = iDataIntegrityInsert.getRawData(lineParam);
+ Map>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
+
runMonitorIds.forEach(temp->{
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
//后续根据不同前置下的测点发送补招密令
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
+
+ LocalDateTime currentDate = param.getReCallStartTime();
+ //循环每一天
+ while (!currentDate.isAfter(param.getReCallEndTime())) {
+
+
+ LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0);
+ //查看数据完整性,根据数据完整性进行补招;
+ //校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
+ Integer recallType = 1; //1全量补 2:差量补 3:不需要补招
+ String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
+ if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
+ DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
+ if( dto.getDueTime()!=0){
+ int i = (dto.getRealTime()) / dto.getDueTime();
+ if( i>=0.98){
+ recallType=3;
+ } else if(i<0.98&&i>0.8){
+ recallType=2;
+ }
+ }
+
+ }
+ if(recallType ==3){
+ currentDate = tempTime;
+ continue;
+ }else if(recallType ==2){
+ Integer timeInterval = data.getTimeInterval();
+ List localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval);
+ List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN));
+ localDateTimeList.removeAll(data1);
+ if(!CollectionUtils.isEmpty(localDateTimeList)){
+ List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
+ //最大时间段为300
+ if(timePeriod.size()<300){
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ recallDTO.setDataType("");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ recallDTO.setTimeInterval(timePeriod);
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ }else {
+ List> timePeriods = CollUtil.split(timePeriod, 300);
+ timePeriods.forEach(period->{
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ recallDTO.setDataType("");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ recallDTO.setTimeInterval(period);
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ });
+ }
+
+
+ }
+
+ }else {
+ //暂态补招
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ //不设置dataType暂态稳态全部补招
+ recallDTO.setDataType("");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ if(tempTime.isAfter(param.getReCallEndTime())){
+ String eventTime = formatInterval(currentDate,param.getReCallEndTime());
+ recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ }else {
+ String eventTime = formatInterval(currentDate,tempTime);
+ recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ }
+
+ }
+ currentDate = tempTime;
+
+
+
+ }
+
+
+ });
+ if(!CollectionUtils.isEmpty(recallDTOList)){
+ Map> collect = recallDTOList.stream().collect(Collectors.groupingBy(RecallMessage.RecallDTO::getNodeId));
+
+ collect.forEach((k,v)->{
+ RecallMessage message = new RecallMessage();
+ message.setNodeId(k);
+ message.setData(v);
+ String guid = IdUtil.simpleUUID();
+ message.setGuid(guid);
+
+
+
+ produceFeignClient.recall(message);
+ guidList.add(guid);
+ });
+
+
+ }
+ return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, guidList, methodDescribe);
+ }
+
+ @PostMapping("/fullHourRecall")
+ @OperateInfo(info = LogEnum.BUSINESS_COMMON)
+ @ApiOperation("按小时数据补招")
+ @ApiImplicitParam(name = "param", value = "参数", required = true)
+ public HttpResult> fullHourRecall(@RequestBody FullRecallMessage param){
+ String methodDescribe = getMethodDescribe("recall");
+ List guidList = new ArrayList<>();
+ List runMonitorIds = new ArrayList<>();
+ List recallDTOList = new ArrayList<>();
+// Duration duration = Duration.between(param.getReCallStartTime(), param.getReCallEndTime());
+// // 获取剩余的分钟数,如果要精确到分钟可以这样做
+// long minutes = duration.toMinutes() ;
+// if(minutes>60){
+// throw new BusinessException("全量补招时间区间不能超过一个小时");
+//
+// }
+ if(CollectionUtils.isEmpty(param.getMonitorId())){
+ runMonitorIds = commTerminalGeneralClient.getRunMonitorIds().getData();
+ }else {
+ runMonitorIds = param.getMonitorId();
+ }
+
+ runMonitorIds.forEach(temp->{
+ LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
+ //后续根据不同前置下的测点发送补招密令
+ DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
+ Integer timeInterval = data.getTimeInterval();
+
+ List localDateTimeList = generateTimeIntervals(param.getReCallStartTime(),param.getReCallEndTime(), timeInterval);
+ List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(param.getReCallStartTime().toLocalDate(), DatePattern.NORM_DATE_PATTERN));
+ List tempTime = data1.stream().filter(dateTime -> (!dateTime.isAfter(param.getReCallEndTime())) &&
+ (!dateTime.isBefore(param.getReCallStartTime()))).collect(Collectors.toList());
+ localDateTimeList.removeAll(tempTime);
+ if(!CollectionUtils.isEmpty(localDateTimeList)){
+ List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
+ //最大时间段为300
+ if(timePeriod.size()<300){
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ recallDTO.setDataType("0");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ recallDTO.setTimeInterval(timePeriod);
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ }else {
+ List> timePeriods = CollUtil.split(timePeriod, 300);
+ timePeriods.forEach(period->{
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ recallDTO.setDataType("0");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ recallDTO.setTimeInterval(period);
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ });
+ }
+
+
+ }
//暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//不设置dataType暂态稳态全部补招
- recallDTO.setDataType("");
+ recallDTO.setDataType("1");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
String eventTime = formatInterval(param.getReCallStartTime(),param.getReCallEndTime());
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
@@ -117,7 +292,6 @@ public class DataRecallController extends BaseController {
}
-
@PostMapping("/recall")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("数据补招")
@@ -134,6 +308,14 @@ public class DataRecallController extends BaseController {
}else {
runMonitorIds = param.getMonitorId();
}
+ LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
+ lineParam.setLineId(runMonitorIds);
+ //查看监测点的数据完整性
+ lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
+ lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
+ List rawData = iDataIntegrityInsert.getRawData(lineParam);
+ Map>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
+
LocalDate currentDate = param.getReCallStartTime();
//循环每一天
while (!currentDate.isAfter(param.getReCallEndTime())) {
@@ -143,19 +325,72 @@ public class DataRecallController extends BaseController {
//后续根据不同前置下的测点发送补招密令
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
- Integer timeInterval = data.getTimeInterval();
- List localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval);
- List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN));
- localDateTimeList.removeAll(data1);
- if(!CollectionUtils.isEmpty(localDateTimeList)){
- List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
+ //查看数据完整性,根据数据完整性进行补招;
+ //校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
+ Integer recallType = 1; //1全量补 2:差量补 3:不需要补招
+ String curDateString = LocalDateTimeUtil.format(finalCurrentDate.atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
+ if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
+ DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
+ if( dto.getDueTime()!=0){
+ int i = (dto.getRealTime()) / dto.getDueTime();
+ if( i>=0.98){
+ recallType=3;
+ } else if(i<0.98&&i>0.8){
+ recallType=2;
+ }
+ }
+
+ }
+
+ if(recallType ==3){
+ return;
+ }else if(recallType ==2){
+ Integer timeInterval = data.getTimeInterval();
+ List localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval);
+ List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN));
+ localDateTimeList.removeAll(data1);
+ if(!CollectionUtils.isEmpty(localDateTimeList)){
+ List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
+ //最大时间段为300
+ if(timePeriod.size()<300){
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ recallDTO.setDataType("0");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ recallDTO.setTimeInterval(timePeriod);
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ }else {
+ List> timePeriods = CollUtil.split(timePeriod, 300);
+ timePeriods.forEach(period->{
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ recallDTO.setDataType("0");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ recallDTO.setTimeInterval(period);
+ recallDTO.setNodeId(data2.getNodeId());
+ recallDTOList.add(recallDTO);
+ });
+ }
+
+
+ }
+
+ }else {
+ //暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ //不设置dataType暂态稳态全部补招
recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
- recallDTO.setTimeInterval(timePeriod);
+
+ String eventTime = formatInterval(finalCurrentDate.atTime(0,0,0),finalCurrentDate.atTime(23,59,0));
+ recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
+
+
}
+
+
+
//暂态补招
RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO();
recallDTO2.setDataType("1");
@@ -177,10 +412,10 @@ public class DataRecallController extends BaseController {
RecallMessage message = new RecallMessage();
message.setNodeId(k);
message.setData(v);
- String guid = IdUtil.simpleUUID();
- message.setGuid(guid);
+ String guid = IdUtil.simpleUUID();
+ message.setGuid(guid);
produceFeignClient.recall(message);
- guidList.add(guid);
+ guidList.add(guid);
});
@@ -225,6 +460,22 @@ public class DataRecallController extends BaseController {
return dateTimeList;
}
+ public List generateTimeIntervals(LocalDateTime startDateTime,LocalDateTime endDateTime , int intervalMinutes) {
+ List dateTimeList = new ArrayList<>();
+
+
+
+ // Generate LocalDateTime list with the given interval
+ LocalDateTime currentDateTime = startDateTime;
+ while (!currentDateTime.isAfter(endDateTime)) {
+ dateTimeList.add(currentDateTime);
+ currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
+ }
+
+ return dateTimeList;
+ }
+
+
public static List mergeTimeIntervals(List times, int intervalMinutes) {
List mergedIntervals = new ArrayList<>();
if (times == null || times.isEmpty()) {
@@ -259,10 +510,18 @@ public class DataRecallController extends BaseController {
}
public static void main(String[] args) {
- // 创建两个LocalDateTime对象
- LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0); // 例如:2023年1月1日 10:00
- LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 2, 15, 30); // 例如:2023年1月1日 15:30
+ LocalDate localDate1 = LocalDate.now();
+ LocalDate localDate2 = LocalDate.now();
+ while (!localDate1.isAfter(localDate2)) {
+ LocalDate finalCurrentDate = localDate1;
+ System.out.println(finalCurrentDate);
+ localDate1 = localDate1.plusDays(1);
+ }
+
+ // 创建两个LocalDateTime对象
+ LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0,11); // 例如:2023年1月1日 10:00
+ LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 1, 10, 30,22); // 例如:2023年1月1日 15:30
// 使用Duration计算两个时间之间的差异
Duration duration = Duration.between(dateTime1, dateTime2);
diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java
new file mode 100644
index 0000000..b978dde
--- /dev/null
+++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java
@@ -0,0 +1,493 @@
+package com.njcn.dataProcess.websocket;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.LocalDateTimeUtil;
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
+import com.njcn.algorithm.pojo.bo.BaseParam;
+import com.njcn.algorithm.pojo.liteflow.LiteFlowAlgorithmFeignClient;
+import com.njcn.dataProcess.annotation.InsertBean;
+import com.njcn.dataProcess.annotation.QueryBean;
+import com.njcn.dataProcess.param.FullRecallMessage;
+import com.njcn.dataProcess.param.LineCountEvaluateParam;
+import com.njcn.dataProcess.pojo.dto.DataIntegrityDto;
+import com.njcn.dataProcess.service.IDataIntegrity;
+import com.njcn.dataProcess.service.IDataV;
+import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
+import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
+import com.njcn.device.pq.api.DeviceFeignClient;
+import com.njcn.device.pq.pojo.dto.DeviceDTO;
+import com.njcn.message.api.ProduceFeignClient;
+import com.njcn.message.constant.RedisKeyPrefix;
+import com.njcn.message.message.RecallMessage;
+import com.njcn.message.messagedto.TopicReplyDTO;
+import com.njcn.redis.utils.RedisUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Description:
+ * Date: 2024/12/13 15:11【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Slf4j
+@Component
+@ServerEndpoint(value ="/api/recell/{userId}")
+public class RecallWebSocketServer {
+ @QueryBean
+ private static IDataV dataVQuery;
+
+ @Autowired
+ @Qualifier("InfluxdbDataVImpl")
+ public void setDataVQuery( IDataV dataVQuery) {
+ RecallWebSocketServer.dataVQuery = dataVQuery;
+ }
+ private static CommTerminalGeneralClient commTerminalGeneralClient;
+
+ @Autowired
+ public void setCommTerminalGeneralClient(CommTerminalGeneralClient commTerminalGeneralClient) {
+ RecallWebSocketServer.commTerminalGeneralClient = commTerminalGeneralClient;
+ }
+ private static ProduceFeignClient produceFeignClient;
+
+ @Autowired
+ public void setProduceFeignClient(ProduceFeignClient produceFeignClient) {
+ RecallWebSocketServer.produceFeignClient = produceFeignClient;
+ }
+
+ @InsertBean
+ private static IDataIntegrity iDataIntegrityInsert;
+
+ @Autowired
+ @Qualifier("RelationDataIntegrityImpl")
+ public void setiDataIntegrityInsert(IDataIntegrity iDataIntegrityInsert) {
+ RecallWebSocketServer.iDataIntegrityInsert = iDataIntegrityInsert;
+ }
+
+ private static LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient;
+
+ @Autowired
+ public void setLiteFlowAlgorithmFeignClient(LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient) {
+ RecallWebSocketServer.liteFlowAlgorithmFeignClient = liteFlowAlgorithmFeignClient;
+ }
+
+ private static RedisUtil redisUtil;
+
+
+
+ @Autowired
+ public void setRedisUtil( RedisUtil redisUtil) {
+ RecallWebSocketServer.redisUtil = redisUtil;
+ }
+
+
+ private static DeviceFeignClient deviceFeignClient;
+
+ // 注入的时候,给类的 service 注入
+ @Autowired
+ public void setLineFeignClient(DeviceFeignClient deviceFeignClient) {
+ RecallWebSocketServer.deviceFeignClient = deviceFeignClient;
+ }
+ /**
+ * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+ */
+ private static int onlineCount = 0;
+ /**
+ * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
+ */
+ private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>();
+ /**
+ * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+ */
+ private Session session;
+ /**
+ * 接收userId
+ */
+ private String userId = "";
+
+ /**
+ * 连接建立成
+ * 功调用的方法
+ */
+ @OnOpen
+ public void onOpen(Session session, @PathParam("userId") String userId) {
+ //lineId 是 userid+","+lineId+","+Devid
+ this.session = session;
+ this.userId = userId;
+ if (webSocketMap.containsKey(userId)) {
+ webSocketMap.remove(userId);
+ //加入set中
+ webSocketMap.put(userId, this);
+ } else {
+ //加入set中
+ webSocketMap.put(userId, this);
+ //在线数加1
+ addOnlineCount();
+ }
+ sendMessage("连接成功");
+
+
+ }
+
+ /**
+ * 连接关闭
+ * 调用的方法
+ */
+ @OnClose
+ public void onClose() {
+ if (webSocketMap.containsKey(userId)) {
+ webSocketMap.remove(userId);
+ //从set中删除
+ subOnlineCount();
+ }
+ log.info("用户退出:" + userId + ",当前在线监测点数为:" + getOnlineCount());
+ }
+
+ /**
+ * 收到客户端消
+ * 息后调用的方法
+ *
+ * @param message 客户端发送过来的消息
+ **/
+ @OnMessage
+ public void onMessage(String message, Session session) {
+ //会每30s发送请求1次
+ log.info("监测点消息:" + userId + ",报文:" + message);
+ FullRecallMessage param = JSONUtil.toBean(message,FullRecallMessage.class,true);
+ if(Objects.isNull(message)){
+ sendInfo("参数有误");
+ }else {
+ List runMonitorIds = param.getMonitorId();
+ LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
+ lineParam.setLineId(runMonitorIds);
+ //查看监测点的数据完整性
+ lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
+ lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
+ List rawData = iDataIntegrityInsert.getRawData(lineParam);
+ Map>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
+ List bsParmList = new ArrayList<>();
+ runMonitorIds.forEach(temp->{
+ LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
+ //后续根据不同前置下的测点发送补招密令
+ DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
+
+
+
+ LocalDateTime currentDate = param.getReCallStartTime();
+ //循环每一天
+ while (!currentDate.isAfter(param.getReCallEndTime())) {
+
+ String key ="";
+ LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0);
+ //查看数据完整性,根据数据完整性进行补招;
+ //校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
+ Integer recallType = 1; //1全量补 2:差量补 3:不需要补招
+ String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
+ if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
+ DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
+ if( dto.getDueTime()!=0){
+ int i = (dto.getRealTime()) / dto.getDueTime();
+ if( i>=0.98){
+ recallType=3;
+ } else if(i<0.98&&i>0.8){
+ recallType=2;
+ }
+ }
+
+ }
+ if(recallType ==3){
+ currentDate = tempTime;
+ sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招");
+ continue;
+ }else if(recallType ==2){
+ Integer timeInterval = data.getTimeInterval();
+ List localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval);
+ List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN));
+ localDateTimeList.removeAll(data1);
+ if(!CollectionUtils.isEmpty(localDateTimeList)){
+ List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ //
+ recallDTO.setDataType("");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ recallDTO.setTimeInterval(timePeriod);
+ recallDTO.setNodeId(data2.getNodeId());
+ RecallMessage recallMessage = new RecallMessage();
+ recallMessage.setNodeId(data2.getNodeId());
+ recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
+ String guid = IdUtil.simpleUUID();
+ recallMessage.setGuid(guid);
+
+
+
+ produceFeignClient.recall(recallMessage);
+ sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招");
+ boolean flag =true;
+ LocalDateTime beginTaskTime = LocalDateTime.now();
+ while (flag){
+ if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
+ key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
+
+ String jsonString =redisUtil.getStringByKey(key);
+ if(Objects.nonNull(jsonString)){
+ TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
+
+ flag =false;
+ sendInfo(bean.getResult());
+ BaseParam baseParam = new BaseParam();
+ baseParam.setFullChain(false);
+ baseParam.setRepair(false);
+ baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
+ baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
+ baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
+ bsParmList.add(baseParam);
+ }
+ }else {
+ flag =false;
+ sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时");
+ BaseParam baseParam = new BaseParam();
+ baseParam.setFullChain(false);
+ baseParam.setRepair(false);
+ baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
+ baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
+ baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
+ bsParmList.add(baseParam);
+ }
+
+ }
+
+ }
+
+ }else {
+ //暂态补招
+ RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
+ //不设置dataType暂态稳态全部补招
+ recallDTO.setDataType("");
+ recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
+ if(tempTime.isAfter(param.getReCallEndTime())){
+ String eventTime = formatInterval(currentDate,param.getReCallEndTime());
+ recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
+ recallDTO.setNodeId(data2.getNodeId());
+ }else {
+ String eventTime = formatInterval(currentDate,tempTime);
+ recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
+ recallDTO.setNodeId(data2.getNodeId());
+ }
+ RecallMessage recallMessage = new RecallMessage();
+ recallMessage.setNodeId(data2.getNodeId());
+ recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
+ String guid = IdUtil.simpleUUID();
+ recallMessage.setGuid(guid);
+ produceFeignClient.recall(recallMessage);
+ sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招");
+
+ boolean flag =true;
+ LocalDateTime beginTaskTime = LocalDateTime.now();
+ while (flag){
+ if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
+ key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
+
+ String jsonString =redisUtil.getStringByKey(key);
+ if(Objects.nonNull(jsonString)){
+ TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
+
+ flag =false;
+ sendInfo(bean.getResult());
+ BaseParam baseParam = new BaseParam();
+ baseParam.setFullChain(false);
+ baseParam.setRepair(false);
+ baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
+ baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
+ baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
+ bsParmList.add(baseParam);
+ }
+ }else {
+ flag =false;
+ sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时");
+ BaseParam baseParam = new BaseParam();
+ baseParam.setFullChain(false);
+ baseParam.setRepair(false);
+ baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
+ baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
+ baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
+ bsParmList.add(baseParam);
+ }
+
+ }
+
+
+ }
+ currentDate = tempTime;
+
+ }
+
+
+ });
+ if(!CollectionUtils.isEmpty(bsParmList)){
+ sendInfo("开始执行数据完整性算法");
+ bsParmList.forEach(temp->{
+ liteFlowAlgorithmFeignClient.measurementPointExecutor(temp);
+ });
+ sendInfo("执行完成");
+
+ }
+
+ }
+ sendInfo("补招任务结束");
+ }
+ public List generateTimeIntervals(LocalDate date, int intervalMinutes) {
+ List dateTimeList = new ArrayList<>();
+
+ // Create the starting LocalDateTime
+ LocalDateTime startDateTime = LocalDateTime.of(date, LocalTime.MIDNIGHT);
+
+ // Create the ending LocalDateTime
+ LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX);
+
+ // Generate LocalDateTime list with the given interval
+ LocalDateTime currentDateTime = startDateTime;
+ while (!currentDateTime.isAfter(endDateTime)) {
+ dateTimeList.add(currentDateTime);
+ currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
+ }
+
+ return dateTimeList;
+ }
+
+ public static List mergeTimeIntervals(List times, int intervalMinutes) {
+ List mergedIntervals = new ArrayList<>();
+ if (times == null || times.isEmpty()) {
+ return mergedIntervals;
+ }
+
+ // Sort the list to ensure the times are in order
+ times.sort(LocalDateTime::compareTo);
+
+ LocalDateTime start = times.get(0);
+ LocalDateTime end = start;
+
+ for (int i = 1; i < times.size(); i++) {
+ LocalDateTime current = times.get(i);
+ if (current.isAfter(end.plusMinutes(intervalMinutes))) {
+ // If the current time is more than interval minutes after the end, close the current interval
+ mergedIntervals.add(formatInterval(start, end));
+ start = current; // Start a new interval
+ }
+ end = current; // Update the end of the current interval
+ }
+
+ // Add the last interval
+ mergedIntervals.add(formatInterval(start, end));
+
+ return mergedIntervals;
+ }
+
+ private static String formatInterval(LocalDateTime start, LocalDateTime end) {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ return start.format(formatter) + "~" + end.format(formatter);
+ }
+
+ /**
+ * @param session
+ * @param error
+ */
+ @OnError
+ public void onError(Session session, Throwable error) {
+
+ log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
+ error.printStackTrace();
+ }
+
+ /**
+ * 实现服务
+ * 器主动推送
+ */
+ public void sendMessage(String message) {
+ try {
+ this.session.getBasicRemote().sendText(message);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 发送自定
+ * 义消息
+ **/
+ public static void sendInfo(String message) {
+
+
+ webSocketMap.forEach((k,v)->{
+ webSocketMap.get(k).sendMessage(message);
+
+ });
+
+ }
+
+ /**
+ * 获得此时的
+ * 在线监测点
+ *
+ * @return
+ */
+ public static synchronized int getOnlineCount() {
+ return onlineCount;
+ }
+
+ /**
+ * 在线监测点
+ * 数加1
+ */
+ public static synchronized void addOnlineCount() {
+ RecallWebSocketServer.onlineCount++;
+ }
+
+ /**
+ * 在线监测点
+ * 数减1
+ */
+ public static synchronized void subOnlineCount() {
+ RecallWebSocketServer.onlineCount--;
+ }
+
+// /**
+// * 过滤所有键包含指定字符串的条目
+// * @param map 原始的Map
+// * @param substring 要检查的子字符串
+// * @return 过滤的Map
+// */
+// public static Map filterMapByKey(ConcurrentHashMap map, String substring) {
+// Map result = new HashMap<>();
+// for (Map.Entry entry : map.entrySet()) {
+// if (entry.getKey().contains(substring)) {
+// result.put(entry.getKey(), entry.getValue().toString());
+// }
+// }
+// return result;
+// }
+
+}
\ No newline at end of file
diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/WebSocketConfig.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/WebSocketConfig.java
new file mode 100644
index 0000000..ad087e7
--- /dev/null
+++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/WebSocketConfig.java
@@ -0,0 +1,41 @@
+package com.njcn.dataProcess.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
+
+/**
+ * Description:
+ * Date: 2024/12/13 15:09【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Configuration
+public class WebSocketConfig {
+
+ @Bean
+ public ServerEndpointExporter serverEndpointExporter() {
+ return new ServerEndpointExporter();
+ }
+
+ /**
+ * 通信文本消息和二进制缓存区大小
+ * 避免对接 第三方 报文过大时,Websocket 1009 错误
+ *
+ * @return
+ */
+
+ @Bean
+ public ServletServerContainerFactoryBean createWebSocketContainer() {
+ ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
+ // 在此处设置bufferSize
+ container.setMaxTextMessageBufferSize(10240000);
+ container.setMaxBinaryMessageBufferSize(10240000);
+ container.setMaxSessionIdleTimeout(15 * 60000L);
+ return container;
+ }
+
+
+}
diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java b/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java
index a6646d5..ff56082 100644
--- a/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java
+++ b/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java
@@ -1,9 +1,14 @@
package com.njcn.message.messagedto;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.njcn.middle.rocket.domain.BaseMessage;
import lombok.Data;
import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
/**
* Description:
@@ -14,11 +19,22 @@ import java.io.Serializable;
*/
@Data
public class TopicReplyDTO extends BaseMessage implements Serializable {
- //消息id
+ //消息id guid="12345"是补招回复结果
private String guid;
private String step;
-
+ //guid="12345"是补招回复结果
private String result;
+ private String lineIndex;
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ @JsonDeserialize(using = LocalDateTimeDeserializer.class)
+ private LocalDateTime recallStartDate;
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ @JsonDeserialize(using = LocalDateTimeDeserializer.class)
+ private LocalDateTime recallEndDate;
+
+
+
+
}
diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java
index e6df7ce..4d9d033 100644
--- a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java
+++ b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java
@@ -64,19 +64,19 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler