补招逻辑修改,手动补招和前端采用websokcet通讯

This commit is contained in:
hzj
2025-08-22 15:55:05 +08:00
parent 536ee24888
commit f519fd3e56
8 changed files with 857 additions and 31 deletions

View File

@@ -79,6 +79,17 @@
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>com.njcn.platform</groupId>
<artifactId>algorithm-api</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View File

@@ -1,5 +1,6 @@
package com.njcn.dataProcess.controller;//package com.njcn.message.websocket;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
@@ -10,8 +11,12 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.dataProcess.annotation.InsertBean;
import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.param.FullRecallMessage;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.DataIntegrityDto;
import com.njcn.dataProcess.service.IDataIntegrity;
import com.njcn.dataProcess.service.IDataV;
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
@@ -57,13 +62,14 @@ public class DataRecallController extends BaseController {
private final ProduceFeignClient produceFeignClient;
private final DeviceFeignClient deviceFeignClient;
@InsertBean
private IDataIntegrity iDataIntegrityInsert;
//页面补招按时间段带小时的全部补招不查datav数据去筛选
@PostMapping("/FullRecall")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("数据全量补招")
@ApiImplicitParam(name = "param", value = "参数", required = true)
public HttpResult<List<String>> recall(@RequestBody FullRecallMessage param) {
public HttpResult<List<String>> FullRecall(@RequestBody FullRecallMessage param) {
String methodDescribe = getMethodDescribe("recall");
List<String> guidList = new ArrayList<>();
List<String> runMonitorIds = new ArrayList<>();
@@ -80,16 +86,185 @@ public class DataRecallController extends BaseController {
}else {
runMonitorIds = param.getMonitorId();
}
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setLineId(runMonitorIds);
//查看监测点的数据完整性
lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
List<DataIntegrityDto> rawData = iDataIntegrityInsert.getRawData(lineParam);
Map<String, Map<String, List<DataIntegrityDto>>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
runMonitorIds.forEach(temp->{
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
//后续根据不同前置下的测点发送补招密令
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
LocalDateTime currentDate = param.getReCallStartTime();
//循环每一天
while (!currentDate.isAfter(param.getReCallEndTime())) {
LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0);
//查看数据完整性,根据数据完整性进行补招;
//校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
Integer recallType = 1; //1全量补 2:差量补 3不需要补招
String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
if( dto.getDueTime()!=0){
int i = (dto.getRealTime()) / dto.getDueTime();
if( i>=0.98){
recallType=3;
} else if(i<0.98&&i>0.8){
recallType=2;
}
}
}
if(recallType ==3){
currentDate = tempTime;
continue;
}else if(recallType ==2){
Integer timeInterval = data.getTimeInterval();
List<LocalDateTime> localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval);
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN));
localDateTimeList.removeAll(data1);
if(!CollectionUtils.isEmpty(localDateTimeList)){
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
//最大时间段为300
if(timePeriod.size()<300){
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
recallDTO.setDataType("");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(timePeriod);
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
}else {
List<List<String>> timePeriods = CollUtil.split(timePeriod, 300);
timePeriods.forEach(period->{
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
recallDTO.setDataType("");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(period);
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
});
}
}
}else {
//暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//不设置dataType暂态稳态全部补招
recallDTO.setDataType("");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
if(tempTime.isAfter(param.getReCallEndTime())){
String eventTime = formatInterval(currentDate,param.getReCallEndTime());
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
}else {
String eventTime = formatInterval(currentDate,tempTime);
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
}
}
currentDate = tempTime;
}
});
if(!CollectionUtils.isEmpty(recallDTOList)){
Map<String, List<RecallMessage.RecallDTO>> collect = recallDTOList.stream().collect(Collectors.groupingBy(RecallMessage.RecallDTO::getNodeId));
collect.forEach((k,v)->{
RecallMessage message = new RecallMessage();
message.setNodeId(k);
message.setData(v);
String guid = IdUtil.simpleUUID();
message.setGuid(guid);
produceFeignClient.recall(message);
guidList.add(guid);
});
}
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, guidList, methodDescribe);
}
@PostMapping("/fullHourRecall")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("按小时数据补招")
@ApiImplicitParam(name = "param", value = "参数", required = true)
public HttpResult<List<String>> fullHourRecall(@RequestBody FullRecallMessage param){
String methodDescribe = getMethodDescribe("recall");
List<String> guidList = new ArrayList<>();
List<String> runMonitorIds = new ArrayList<>();
List<RecallMessage.RecallDTO> recallDTOList = new ArrayList<>();
// Duration duration = Duration.between(param.getReCallStartTime(), param.getReCallEndTime());
// // 获取剩余的分钟数,如果要精确到分钟可以这样做
// long minutes = duration.toMinutes() ;
// if(minutes>60){
// throw new BusinessException("全量补招时间区间不能超过一个小时");
//
// }
if(CollectionUtils.isEmpty(param.getMonitorId())){
runMonitorIds = commTerminalGeneralClient.getRunMonitorIds().getData();
}else {
runMonitorIds = param.getMonitorId();
}
runMonitorIds.forEach(temp->{
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
//后续根据不同前置下的测点发送补招密令
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
Integer timeInterval = data.getTimeInterval();
List<LocalDateTime> localDateTimeList = generateTimeIntervals(param.getReCallStartTime(),param.getReCallEndTime(), timeInterval);
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(param.getReCallStartTime().toLocalDate(), DatePattern.NORM_DATE_PATTERN));
List<LocalDateTime> tempTime = data1.stream().filter(dateTime -> (!dateTime.isAfter(param.getReCallEndTime())) &&
(!dateTime.isBefore(param.getReCallStartTime()))).collect(Collectors.toList());
localDateTimeList.removeAll(tempTime);
if(!CollectionUtils.isEmpty(localDateTimeList)){
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
//最大时间段为300
if(timePeriod.size()<300){
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(timePeriod);
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
}else {
List<List<String>> timePeriods = CollUtil.split(timePeriod, 300);
timePeriods.forEach(period->{
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(period);
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
});
}
}
//暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//不设置dataType暂态稳态全部补招
recallDTO.setDataType("");
recallDTO.setDataType("1");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
String eventTime = formatInterval(param.getReCallStartTime(),param.getReCallEndTime());
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
@@ -117,7 +292,6 @@ public class DataRecallController extends BaseController {
}
@PostMapping("/recall")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("数据补招")
@@ -134,6 +308,14 @@ public class DataRecallController extends BaseController {
}else {
runMonitorIds = param.getMonitorId();
}
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setLineId(runMonitorIds);
//查看监测点的数据完整性
lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
List<DataIntegrityDto> rawData = iDataIntegrityInsert.getRawData(lineParam);
Map<String, Map<String, List<DataIntegrityDto>>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
LocalDate currentDate = param.getReCallStartTime();
//循环每一天
while (!currentDate.isAfter(param.getReCallEndTime())) {
@@ -143,19 +325,72 @@ public class DataRecallController extends BaseController {
//后续根据不同前置下的测点发送补招密令
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
Integer timeInterval = data.getTimeInterval();
List<LocalDateTime> localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval);
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN));
localDateTimeList.removeAll(data1);
if(!CollectionUtils.isEmpty(localDateTimeList)){
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
//查看数据完整性,根据数据完整性进行补招;
//校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
Integer recallType = 1; //1全量补 2:差量补 3不需要补招
String curDateString = LocalDateTimeUtil.format(finalCurrentDate.atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
if( dto.getDueTime()!=0){
int i = (dto.getRealTime()) / dto.getDueTime();
if( i>=0.98){
recallType=3;
} else if(i<0.98&&i>0.8){
recallType=2;
}
}
}
if(recallType ==3){
return;
}else if(recallType ==2){
Integer timeInterval = data.getTimeInterval();
List<LocalDateTime> localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval);
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN));
localDateTimeList.removeAll(data1);
if(!CollectionUtils.isEmpty(localDateTimeList)){
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
//最大时间段为300
if(timePeriod.size()<300){
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(timePeriod);
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
}else {
List<List<String>> timePeriods = CollUtil.split(timePeriod, 300);
timePeriods.forEach(period->{
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(period);
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
});
}
}
}else {
//暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//不设置dataType暂态稳态全部补招
recallDTO.setDataType("0");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(timePeriod);
String eventTime = formatInterval(finalCurrentDate.atTime(0,0,0),finalCurrentDate.atTime(23,59,0));
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
recallDTOList.add(recallDTO);
}
//暂态补招
RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO();
recallDTO2.setDataType("1");
@@ -177,10 +412,10 @@ public class DataRecallController extends BaseController {
RecallMessage message = new RecallMessage();
message.setNodeId(k);
message.setData(v);
String guid = IdUtil.simpleUUID();
message.setGuid(guid);
String guid = IdUtil.simpleUUID();
message.setGuid(guid);
produceFeignClient.recall(message);
guidList.add(guid);
guidList.add(guid);
});
@@ -225,6 +460,22 @@ public class DataRecallController extends BaseController {
return dateTimeList;
}
public List<LocalDateTime> generateTimeIntervals(LocalDateTime startDateTime,LocalDateTime endDateTime , int intervalMinutes) {
List<LocalDateTime> dateTimeList = new ArrayList<>();
// Generate LocalDateTime list with the given interval
LocalDateTime currentDateTime = startDateTime;
while (!currentDateTime.isAfter(endDateTime)) {
dateTimeList.add(currentDateTime);
currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
}
return dateTimeList;
}
public static List<String> mergeTimeIntervals(List<LocalDateTime> times, int intervalMinutes) {
List<String> mergedIntervals = new ArrayList<>();
if (times == null || times.isEmpty()) {
@@ -259,10 +510,18 @@ public class DataRecallController extends BaseController {
}
public static void main(String[] args) {
// 创建两个LocalDateTime对象
LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0); // 例如2023年1月1日 10:00
LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 2, 15, 30); // 例如2023年1月1日 15:30
LocalDate localDate1 = LocalDate.now();
LocalDate localDate2 = LocalDate.now();
while (!localDate1.isAfter(localDate2)) {
LocalDate finalCurrentDate = localDate1;
System.out.println(finalCurrentDate);
localDate1 = localDate1.plusDays(1);
}
// 创建两个LocalDateTime对象
LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0,11); // 例如2023年1月1日 10:00
LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 1, 10, 30,22); // 例如2023年1月1日 15:30
// 使用Duration计算两个时间之间的差异
Duration duration = Duration.between(dateTime1, dateTime2);

View File

@@ -0,0 +1,493 @@
package com.njcn.dataProcess.websocket;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
import com.njcn.algorithm.pojo.bo.BaseParam;
import com.njcn.algorithm.pojo.liteflow.LiteFlowAlgorithmFeignClient;
import com.njcn.dataProcess.annotation.InsertBean;
import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.param.FullRecallMessage;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.DataIntegrityDto;
import com.njcn.dataProcess.service.IDataIntegrity;
import com.njcn.dataProcess.service.IDataV;
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
import com.njcn.device.pq.api.DeviceFeignClient;
import com.njcn.device.pq.pojo.dto.DeviceDTO;
import com.njcn.message.api.ProduceFeignClient;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.message.message.RecallMessage;
import com.njcn.message.messagedto.TopicReplyDTO;
import com.njcn.redis.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Description:
* Date: 2024/12/13 15:11【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@Component
@ServerEndpoint(value ="/api/recell/{userId}")
public class RecallWebSocketServer {
@QueryBean
private static IDataV dataVQuery;
@Autowired
@Qualifier("InfluxdbDataVImpl")
public void setDataVQuery( IDataV dataVQuery) {
RecallWebSocketServer.dataVQuery = dataVQuery;
}
private static CommTerminalGeneralClient commTerminalGeneralClient;
@Autowired
public void setCommTerminalGeneralClient(CommTerminalGeneralClient commTerminalGeneralClient) {
RecallWebSocketServer.commTerminalGeneralClient = commTerminalGeneralClient;
}
private static ProduceFeignClient produceFeignClient;
@Autowired
public void setProduceFeignClient(ProduceFeignClient produceFeignClient) {
RecallWebSocketServer.produceFeignClient = produceFeignClient;
}
@InsertBean
private static IDataIntegrity iDataIntegrityInsert;
@Autowired
@Qualifier("RelationDataIntegrityImpl")
public void setiDataIntegrityInsert(IDataIntegrity iDataIntegrityInsert) {
RecallWebSocketServer.iDataIntegrityInsert = iDataIntegrityInsert;
}
private static LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient;
@Autowired
public void setLiteFlowAlgorithmFeignClient(LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient) {
RecallWebSocketServer.liteFlowAlgorithmFeignClient = liteFlowAlgorithmFeignClient;
}
private static RedisUtil redisUtil;
@Autowired
public void setRedisUtil( RedisUtil redisUtil) {
RecallWebSocketServer.redisUtil = redisUtil;
}
private static DeviceFeignClient deviceFeignClient;
// 注入的时候,给类的 service 注入
@Autowired
public void setLineFeignClient(DeviceFeignClient deviceFeignClient) {
RecallWebSocketServer.deviceFeignClient = deviceFeignClient;
}
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set用来存放每个客户端对应的WebSocket对象。
*/
private static ConcurrentHashMap<String, RecallWebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
/**
* 连接建立成
* 功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
//lineId 是 userid+","+lineId+","+Devid
this.session = session;
this.userId = userId;
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
//加入set中
webSocketMap.put(userId, this);
} else {
//加入set中
webSocketMap.put(userId, this);
//在线数加1
addOnlineCount();
}
sendMessage("连接成功");
}
/**
* 连接关闭
* 调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
//从set中删除
subOnlineCount();
}
log.info("用户退出:" + userId + ",当前在线监测点数为:" + getOnlineCount());
}
/**
* 收到客户端消
* 息后调用的方法
*
* @param message 客户端发送过来的消息
**/
@OnMessage
public void onMessage(String message, Session session) {
//会每30s发送请求1次
log.info("监测点消息:" + userId + ",报文:" + message);
FullRecallMessage param = JSONUtil.toBean(message,FullRecallMessage.class,true);
if(Objects.isNull(message)){
sendInfo("参数有误");
}else {
List<String> runMonitorIds = param.getMonitorId();
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setLineId(runMonitorIds);
//查看监测点的数据完整性
lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
List<DataIntegrityDto> rawData = iDataIntegrityInsert.getRawData(lineParam);
Map<String, Map<String, List<DataIntegrityDto>>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
List<BaseParam> bsParmList = new ArrayList<>();
runMonitorIds.forEach(temp->{
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
//后续根据不同前置下的测点发送补招密令
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
LocalDateTime currentDate = param.getReCallStartTime();
//循环每一天
while (!currentDate.isAfter(param.getReCallEndTime())) {
String key ="";
LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0);
//查看数据完整性,根据数据完整性进行补招;
//校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
Integer recallType = 1; //1全量补 2:差量补 3不需要补招
String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
if( dto.getDueTime()!=0){
int i = (dto.getRealTime()) / dto.getDueTime();
if( i>=0.98){
recallType=3;
} else if(i<0.98&&i>0.8){
recallType=2;
}
}
}
if(recallType ==3){
currentDate = tempTime;
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招");
continue;
}else if(recallType ==2){
Integer timeInterval = data.getTimeInterval();
List<LocalDateTime> localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval);
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN));
localDateTimeList.removeAll(data1);
if(!CollectionUtils.isEmpty(localDateTimeList)){
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//
recallDTO.setDataType("");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
recallDTO.setTimeInterval(timePeriod);
recallDTO.setNodeId(data2.getNodeId());
RecallMessage recallMessage = new RecallMessage();
recallMessage.setNodeId(data2.getNodeId());
recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
String guid = IdUtil.simpleUUID();
recallMessage.setGuid(guid);
produceFeignClient.recall(recallMessage);
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招");
boolean flag =true;
LocalDateTime beginTaskTime = LocalDateTime.now();
while (flag){
if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
String jsonString =redisUtil.getStringByKey(key);
if(Objects.nonNull(jsonString)){
TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
flag =false;
sendInfo(bean.getResult());
BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false);
baseParam.setRepair(false);
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
bsParmList.add(baseParam);
}
}else {
flag =false;
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时");
BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false);
baseParam.setRepair(false);
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
bsParmList.add(baseParam);
}
}
}
}else {
//暂态补招
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
//不设置dataType暂态稳态全部补招
recallDTO.setDataType("");
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
if(tempTime.isAfter(param.getReCallEndTime())){
String eventTime = formatInterval(currentDate,param.getReCallEndTime());
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
}else {
String eventTime = formatInterval(currentDate,tempTime);
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
recallDTO.setNodeId(data2.getNodeId());
}
RecallMessage recallMessage = new RecallMessage();
recallMessage.setNodeId(data2.getNodeId());
recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
String guid = IdUtil.simpleUUID();
recallMessage.setGuid(guid);
produceFeignClient.recall(recallMessage);
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招");
boolean flag =true;
LocalDateTime beginTaskTime = LocalDateTime.now();
while (flag){
if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
String jsonString =redisUtil.getStringByKey(key);
if(Objects.nonNull(jsonString)){
TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
flag =false;
sendInfo(bean.getResult());
BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false);
baseParam.setRepair(false);
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
bsParmList.add(baseParam);
}
}else {
flag =false;
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时");
BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false);
baseParam.setRepair(false);
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
bsParmList.add(baseParam);
}
}
}
currentDate = tempTime;
}
});
if(!CollectionUtils.isEmpty(bsParmList)){
sendInfo("开始执行数据完整性算法");
bsParmList.forEach(temp->{
liteFlowAlgorithmFeignClient.measurementPointExecutor(temp);
});
sendInfo("执行完成");
}
}
sendInfo("补招任务结束");
}
public List<LocalDateTime> generateTimeIntervals(LocalDate date, int intervalMinutes) {
List<LocalDateTime> dateTimeList = new ArrayList<>();
// Create the starting LocalDateTime
LocalDateTime startDateTime = LocalDateTime.of(date, LocalTime.MIDNIGHT);
// Create the ending LocalDateTime
LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX);
// Generate LocalDateTime list with the given interval
LocalDateTime currentDateTime = startDateTime;
while (!currentDateTime.isAfter(endDateTime)) {
dateTimeList.add(currentDateTime);
currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
}
return dateTimeList;
}
public static List<String> mergeTimeIntervals(List<LocalDateTime> times, int intervalMinutes) {
List<String> mergedIntervals = new ArrayList<>();
if (times == null || times.isEmpty()) {
return mergedIntervals;
}
// Sort the list to ensure the times are in order
times.sort(LocalDateTime::compareTo);
LocalDateTime start = times.get(0);
LocalDateTime end = start;
for (int i = 1; i < times.size(); i++) {
LocalDateTime current = times.get(i);
if (current.isAfter(end.plusMinutes(intervalMinutes))) {
// If the current time is more than interval minutes after the end, close the current interval
mergedIntervals.add(formatInterval(start, end));
start = current; // Start a new interval
}
end = current; // Update the end of the current interval
}
// Add the last interval
mergedIntervals.add(formatInterval(start, end));
return mergedIntervals;
}
private static String formatInterval(LocalDateTime start, LocalDateTime end) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return start.format(formatter) + "~" + end.format(formatter);
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务
* 器主动推送
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 发送自定
* 义消息
**/
public static void sendInfo(String message) {
webSocketMap.forEach((k,v)->{
webSocketMap.get(k).sendMessage(message);
});
}
/**
* 获得此时的
* 在线监测点
*
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 在线监测点
* 数加1
*/
public static synchronized void addOnlineCount() {
RecallWebSocketServer.onlineCount++;
}
/**
* 在线监测点
* 数减1
*/
public static synchronized void subOnlineCount() {
RecallWebSocketServer.onlineCount--;
}
// /**
// * 过滤所有键包含指定字符串的条目
// * @param map 原始的Map
// * @param substring 要检查的子字符串
// * @return 过滤的Map
// */
// public static Map<String, String> filterMapByKey(ConcurrentHashMap<String, RecallWebSocketServer> map, String substring) {
// Map<String, String> result = new HashMap<>();
// for (Map.Entry<String, RecallWebSocketServer> entry : map.entrySet()) {
// if (entry.getKey().contains(substring)) {
// result.put(entry.getKey(), entry.getValue().toString());
// }
// }
// return result;
// }
}

View File

@@ -0,0 +1,41 @@
package com.njcn.dataProcess.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
/**
* Description:
* Date: 2024/12/13 15:09【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* 通信文本消息和二进制缓存区大小
* 避免对接 第三方 报文过大时Websocket 1009 错误
*
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 在此处设置bufferSize
container.setMaxTextMessageBufferSize(10240000);
container.setMaxBinaryMessageBufferSize(10240000);
container.setMaxSessionIdleTimeout(15 * 60000L);
return container;
}
}

View File

@@ -1,9 +1,14 @@
package com.njcn.message.messagedto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.njcn.middle.rocket.domain.BaseMessage;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* Description:
@@ -14,11 +19,22 @@ import java.io.Serializable;
*/
@Data
public class TopicReplyDTO extends BaseMessage implements Serializable {
//消息id
//消息id guid="12345"是补招回复结果
private String guid;
private String step;
//guid="12345"是补招回复结果
private String result;
private String lineIndex;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime recallStartDate;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime recallEndDate;
}

View File

@@ -64,19 +64,19 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler<Dev
*/
@Override
public boolean filter(DevComFlagDTO message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false;
}
return true;
// String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
// if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
// return false;
// }
return false;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(DevComFlagDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
}

View File

@@ -74,7 +74,7 @@ public class RealTimeDataConsumer extends EnhanceConsumerMessageHandler<MessageD
*/
@Override
protected void consumeSuccess(MessageDataDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
}

View File

@@ -1,12 +1,12 @@
package com.njcn.message.consumer;
import cn.hutool.core.date.DatePattern;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.messagedto.TopicReplyDTO;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.api.MessAnalysisFeignClient;
@@ -76,14 +76,20 @@ public class TopicReplyConsumer extends EnhanceConsumerMessageHandler<TopicReply
*/
@Override
protected void consumeSuccess(TopicReplyDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
@Override
protected void handleMessage(TopicReplyDTO message) {
//业务处理
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),JSONObject.toJSONString(message),60*60L);
//“12345”补招回复
if(Objects.equals(message.getGuid(),"12345")){
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getLineIndex().concat(message.getRecallStartDate().toLocalDate().format(DatePattern.NORM_DATE_FORMATTER))),JSONObject.toJSONString(message),60*60L);
}else {
//业务处理
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),JSONObject.toJSONString(message),60*60L);
}
}