补招逻辑修改,手动补招和前端采用websokcet通讯

This commit is contained in:
hzj
2025-08-25 16:38:18 +08:00
parent e0154fb3ef
commit 9261adb79a
2 changed files with 65 additions and 12 deletions

View File

@@ -0,0 +1,23 @@
package com.njcn.dataProcess.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
/**
* Description:
* Date: 2025/08/25 下午 2:42【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
@RequiredArgsConstructor
@AllArgsConstructor
public class RecallReplyDTO {
//code 200 完成500错误300补招进行中
private Integer code;
private String message;
}

View File

@@ -5,11 +5,13 @@ import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.gson.JsonObject; import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
import com.njcn.algorithm.pojo.bo.BaseParam; import com.njcn.algorithm.pojo.bo.BaseParam;
import com.njcn.algorithm.pojo.liteflow.LiteFlowAlgorithmFeignClient; import com.njcn.algorithm.pojo.liteflow.LiteFlowAlgorithmFeignClient;
import com.njcn.dataProcess.annotation.InsertBean; import com.njcn.dataProcess.annotation.InsertBean;
import com.njcn.dataProcess.annotation.QueryBean; import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.dto.RecallReplyDTO;
import com.njcn.dataProcess.param.FullRecallMessage; import com.njcn.dataProcess.param.FullRecallMessage;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; import com.njcn.dataProcess.pojo.dto.DataIntegrityDto;
@@ -136,7 +138,7 @@ public class RecallWebSocketServer {
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) { public void onOpen(Session session, @PathParam("userId") String userId) {
//lineId 是 userid+","+lineId+","+Devid //lineId 是 userid+","+lineId+","+Devid
this.session = session; this.session = session;
this.userId = userId; this.userId = userId;
if (webSocketMap.containsKey(userId)) { if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId); webSocketMap.remove(userId);
@@ -177,9 +179,15 @@ public class RecallWebSocketServer {
public void onMessage(String message, Session session) { public void onMessage(String message, Session session) {
//会每30s发送请求1次 //会每30s发送请求1次
log.info("监测点消息:" + userId + ",报文:" + message); log.info("监测点消息:" + userId + ",报文:" + message);
if(Objects.equals(message,"alive")){
sendInfo("connect");
return;
}
FullRecallMessage param = JSONUtil.toBean(message,FullRecallMessage.class,true); FullRecallMessage param = JSONUtil.toBean(message,FullRecallMessage.class,true);
if(Objects.isNull(message)){ if(Objects.isNull(message)){
sendInfo("参数有误"); RecallReplyDTO recallReplyDTO = new RecallReplyDTO(500,"参数有误");
sendInfo(JSONObject.toJSONString(recallReplyDTO));
}else { }else {
List<String> runMonitorIds = param.getMonitorId(); List<String> runMonitorIds = param.getMonitorId();
LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
@@ -220,8 +228,9 @@ public class RecallWebSocketServer {
} }
if(recallType ==3){ if(recallType ==3){
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招");
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招"); sendInfo(JSONObject.toJSONString(recallReplyDTO));
currentDate = tempTime; currentDate = tempTime;
continue; continue;
}else if(recallType ==2){ }else if(recallType ==2){
@@ -246,7 +255,9 @@ public class RecallWebSocketServer {
produceFeignClient.recall(recallMessage); produceFeignClient.recall(recallMessage);
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招"); RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招");
sendInfo(JSONObject.toJSONString(recallReplyDTO));
boolean flag =true; boolean flag =true;
LocalDateTime beginTaskTime = LocalDateTime.now(); LocalDateTime beginTaskTime = LocalDateTime.now();
while (flag){ while (flag){
@@ -258,7 +269,10 @@ public class RecallWebSocketServer {
TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
redisUtil.delete(key); redisUtil.delete(key);
flag =false; flag =false;
sendInfo(bean.getResult()); RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
BaseParam baseParam = new BaseParam(); BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false); baseParam.setFullChain(false);
baseParam.setRepair(false); baseParam.setRepair(false);
@@ -269,7 +283,10 @@ public class RecallWebSocketServer {
} }
}else { }else {
flag =false; flag =false;
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时"); RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时");
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
BaseParam baseParam = new BaseParam(); BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false); baseParam.setFullChain(false);
baseParam.setRepair(false); baseParam.setRepair(false);
@@ -304,7 +321,9 @@ public class RecallWebSocketServer {
String guid = IdUtil.simpleUUID(); String guid = IdUtil.simpleUUID();
recallMessage.setGuid(guid); recallMessage.setGuid(guid);
produceFeignClient.recall(recallMessage); produceFeignClient.recall(recallMessage);
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招"); RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招");
sendInfo(JSONObject.toJSONString(recallReplyDTO));
boolean flag =true; boolean flag =true;
LocalDateTime beginTaskTime = LocalDateTime.now(); LocalDateTime beginTaskTime = LocalDateTime.now();
@@ -318,7 +337,10 @@ public class RecallWebSocketServer {
redisUtil.delete(key); redisUtil.delete(key);
flag =false; flag =false;
sendInfo(bean.getResult()); RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
BaseParam baseParam = new BaseParam(); BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false); baseParam.setFullChain(false);
baseParam.setRepair(false); baseParam.setRepair(false);
@@ -329,7 +351,9 @@ public class RecallWebSocketServer {
} }
}else { }else {
flag =false; flag =false;
sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时"); RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时");
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
BaseParam baseParam = new BaseParam(); BaseParam baseParam = new BaseParam();
baseParam.setFullChain(false); baseParam.setFullChain(false);
baseParam.setRepair(false); baseParam.setRepair(false);
@@ -350,16 +374,22 @@ public class RecallWebSocketServer {
}); });
if(!CollectionUtils.isEmpty(bsParmList)){ if(!CollectionUtils.isEmpty(bsParmList)){
sendInfo("开始执行数据完整性算法"); RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"开始执行数据完整性算法");
sendInfo(JSONObject.toJSONString(recallReplyDTO));
bsParmList.forEach(temp->{ bsParmList.forEach(temp->{
liteFlowAlgorithmFeignClient.measurementPointExecutor(temp); liteFlowAlgorithmFeignClient.measurementPointExecutor(temp);
}); });
sendInfo("执行完成"); RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"执行完成");
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
} }
} }
sendInfo("补招任务结束"); RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(200,"补招任务结束");
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
} }
public List<LocalDateTime> generateTimeIntervals(LocalDate date, int intervalMinutes) { public List<LocalDateTime> generateTimeIntervals(LocalDate date, int intervalMinutes) {
List<LocalDateTime> dateTimeList = new ArrayList<>(); List<LocalDateTime> dateTimeList = new ArrayList<>();