diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/RecallReplyDTO.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/RecallReplyDTO.java new file mode 100644 index 0000000..8f7ce48 --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/RecallReplyDTO.java @@ -0,0 +1,23 @@ +package com.njcn.dataProcess.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +/** + * Description: + * Date: 2025/08/25 下午 2:42【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +@RequiredArgsConstructor +@AllArgsConstructor +public class RecallReplyDTO { + //code 200 完成,500错误;300补招进行中 + private Integer code; + private String message; + + +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java index 5e6b493..b4653bb 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/websocket/RecallWebSocketServer.java @@ -5,11 +5,13 @@ import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.gson.JsonObject; import com.njcn.algorithm.pojo.bo.BaseParam; import com.njcn.algorithm.pojo.liteflow.LiteFlowAlgorithmFeignClient; import com.njcn.dataProcess.annotation.InsertBean; import com.njcn.dataProcess.annotation.QueryBean; +import com.njcn.dataProcess.dto.RecallReplyDTO; import com.njcn.dataProcess.param.FullRecallMessage; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; @@ -136,7 +138,7 @@ public class RecallWebSocketServer { @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { //lineId 是 userid+","+lineId+","+Devid - this.session = session; + this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); @@ -177,9 +179,15 @@ public class RecallWebSocketServer { public void onMessage(String message, Session session) { //会每30s发送请求1次 log.info("监测点消息:" + userId + ",报文:" + message); + if(Objects.equals(message,"alive")){ + sendInfo("connect"); + return; + } FullRecallMessage param = JSONUtil.toBean(message,FullRecallMessage.class,true); + if(Objects.isNull(message)){ - sendInfo("参数有误"); + RecallReplyDTO recallReplyDTO = new RecallReplyDTO(500,"参数有误"); + sendInfo(JSONObject.toJSONString(recallReplyDTO)); }else { List runMonitorIds = param.getMonitorId(); LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -220,8 +228,9 @@ public class RecallWebSocketServer { } if(recallType ==3){ + RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招"); - sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性大于98%,无需补招"); + sendInfo(JSONObject.toJSONString(recallReplyDTO)); currentDate = tempTime; continue; }else if(recallType ==2){ @@ -246,7 +255,9 @@ public class RecallWebSocketServer { produceFeignClient.recall(recallMessage); - sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招"); + RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在80~98%之间,进行补招"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO)); boolean flag =true; LocalDateTime beginTaskTime = LocalDateTime.now(); while (flag){ @@ -258,7 +269,10 @@ public class RecallWebSocketServer { TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true); redisUtil.delete(key); flag =false; - sendInfo(bean.getResult()); + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult()); + + sendInfo(JSONObject.toJSONString(recallReplyDTO2)); + BaseParam baseParam = new BaseParam(); baseParam.setFullChain(false); baseParam.setRepair(false); @@ -269,7 +283,10 @@ public class RecallWebSocketServer { } }else { flag =false; - sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时"); + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO2)); + BaseParam baseParam = new BaseParam(); baseParam.setFullChain(false); baseParam.setRepair(false); @@ -304,7 +321,9 @@ public class RecallWebSocketServer { String guid = IdUtil.simpleUUID(); recallMessage.setGuid(guid); produceFeignClient.recall(recallMessage); - sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招"); + RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"数据完整性在小于80%,进行补招"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO)); boolean flag =true; LocalDateTime beginTaskTime = LocalDateTime.now(); @@ -318,7 +337,10 @@ public class RecallWebSocketServer { redisUtil.delete(key); flag =false; - sendInfo(bean.getResult()); + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult()); + + sendInfo(JSONObject.toJSONString(recallReplyDTO2)); + BaseParam baseParam = new BaseParam(); baseParam.setFullChain(false); baseParam.setRepair(false); @@ -329,7 +351,9 @@ public class RecallWebSocketServer { } }else { flag =false; - sendInfo("监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时"); + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO2)); BaseParam baseParam = new BaseParam(); baseParam.setFullChain(false); baseParam.setRepair(false); @@ -350,16 +374,22 @@ public class RecallWebSocketServer { }); if(!CollectionUtils.isEmpty(bsParmList)){ - sendInfo("开始执行数据完整性算法"); + RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"开始执行数据完整性算法"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO)); bsParmList.forEach(temp->{ liteFlowAlgorithmFeignClient.measurementPointExecutor(temp); }); - sendInfo("执行完成"); + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"执行完成"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO2)); } } - sendInfo("补招任务结束"); + RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(200,"补招任务结束"); + + sendInfo(JSONObject.toJSONString(recallReplyDTO2)); } public List generateTimeIntervals(LocalDate date, int intervalMinutes) { List dateTimeList = new ArrayList<>();