From 338e42601707e0d83a738b46f8783e41e2908e7d Mon Sep 17 00:00:00 2001 From: chendaofei <857448963@qq.com> Date: Fri, 27 Dec 2024 15:02:49 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/SocketDevResponseService.java | 138 ++++++++++++------ .../gather/detection/pojo/vo/WebSocketVO.java | 16 +- .../socket/cilent/NettyDevClientHandler.java | 10 +- 3 files changed, 108 insertions(+), 56 deletions(-) diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index e981516b..26acf44c 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -10,9 +10,11 @@ import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.po.DevData; import com.njcn.gather.detection.pojo.po.SourceCompareDev; +import com.njcn.gather.detection.pojo.vo.DevLineTestResult; import com.njcn.gather.detection.pojo.vo.SocketDataMsg; import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.pojo.vo.WebSocketVO; +import com.njcn.gather.detection.service.impl.DetectionServiceImpl; import com.njcn.gather.detection.util.socket.MsgUtil; import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.web.WebSocketHandler; @@ -22,6 +24,7 @@ import com.njcn.gather.device.script.pojo.param.PqScriptIssueParam; import com.njcn.gather.device.script.pojo.po.SourceIssue; import com.njcn.gather.device.script.service.IPqScriptDtlsService; import com.njcn.gather.storage.pojo.po.AdNonHarmonicResult; +import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; @@ -43,6 +46,7 @@ public class SocketDevResponseService { private final IPqDevService iPqDevService; private final IPqScriptDtlsService scriptDtlsService; private final IPqScriptDtlsService pqScriptDtlsService; + private final DetectionServiceImpl detectionServiceImpl; /** * 存储的装置相序数据 @@ -60,6 +64,8 @@ public class SocketDevResponseService { */ List monitorIdListComm = new ArrayList<>(); + + private List devList = new ArrayList<>(); /** * 装置名称 */ @@ -82,7 +88,9 @@ public class SocketDevResponseService { public void deal(PreDetectionParam param, String msg) { SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); - SourceOperateCodeEnum sourceOperateCodeEnum = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getRequestId()); + String[] tem = socketDataMsg.getRequestId().split("&&"); + + SourceOperateCodeEnum sourceOperateCodeEnum = SourceOperateCodeEnum.getDictDataEnumByCode(tem[0]); switch (sourceOperateCodeEnum) { //设备通讯校验 case YJC_SBTXJY: @@ -116,7 +124,7 @@ public class SocketDevResponseService { private void devComm(SocketDataMsg socketDataMsg, PreDetectionParam param, String msg) { SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); String s = param.getUserPageId() + handlerStr; - SocketMsg socketMsg = new SocketMsg(); + SocketMsg socketMsg = new SocketMsg<>(); switch (Objects.requireNonNull(dictDataEnumByCode)) { case SUCCESS: //通讯校验成功 @@ -156,22 +164,22 @@ public class SocketDevResponseService { case DEV_ERROR: webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2)); - quitSend(param, socketMsg); - quitSendSource(param, socketMsg); + quitSend(param); + quitSendSource(param); break; case DEV_TARGET: webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2)); - quitSend(param, socketMsg); - quitSendSource(param, socketMsg); + quitSend(param); + quitSendSource(param); break; case RE_OPERATE: //出现已经初始化情况,发送用户用户确认是否继续检测 webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - quitSend(param, socketMsg); + quitSend(param); break; case NO_INIT_DEV: //发起关闭操作 - quitSend(param, socketMsg); + quitSend(param); break; default: WebSocketVO webSocketVO = new WebSocketVO(); @@ -194,12 +202,9 @@ public class SocketDevResponseService { webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2)); if (socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.DEV_INIT_GATHER_02.getValue())) { successXieyi.add(socketDataMsg.getData()); - - if (successXieyi.size() == monitorIdListComm.size()) { for (String string : successXieyi) { - //模拟统计协议校验,通讯校验已经校验过,模拟直接推送 SocketDataMsg webSend = new SocketDataMsg(); @@ -211,7 +216,7 @@ public class SocketDevResponseService { } //协议3校验 - successXieyi = new ArrayList<>(); + //successXieyi = new ArrayList<>(); List devList = iPqDevService.getDevInfo(param.getDevIds()); Map> map = new HashMap(1); @@ -244,23 +249,28 @@ public class SocketDevResponseService { PqScriptIssueParam issueParam = new PqScriptIssueParam(); issueParam.setPlanId(param.getPlanId()); issueParam.setSourceId(param.getSourceId()); - issueParam.setIsPhaseSequence(true); issueParam.setDevIds(param.getDevIds()); + //issueParam.setScriptId("2973cb938b591b93d0df2547592b8cd8"); + + SocketMsg xuMsg = new SocketMsg(); + xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); + if (SourceOperateCodeEnum.FORMAL_TEST.getValue().equals(param.getOperateType())) { + //正式检测 + issueParam.setIsPhaseSequence(true); + xuMsg.setRequestId(SourceOperateCodeEnum.Test_VOL.getValue()); + } else if (SourceOperateCodeEnum.PRE_TEST.getValue().equals(param.getOperateType())) { + //预检测 + issueParam.setIsPhaseSequence(false); + xuMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); + } List sourceIssues = pqScriptDtlsService.listSourceIssue(issueParam); - - if (CollUtil.isNotEmpty(sourceIssues)) { - SocketMsg xuMsg = new SocketMsg(); - xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); - xuMsg.setData(JSON.toJSONString(sourceIssues.get(0))); - if (SourceOperateCodeEnum.FORMAL_TEST.getValue().equals(param.getOperateType())) { - //正式检测 - xuMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue()); - } else if (SourceOperateCodeEnum.PRE_TEST.getValue().equals(param.getOperateType())) { - //预检测 - xuMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); + if(SourceOperateCodeEnum.FORMAL_TEST.getValue().equals(param.getOperateType())){ + SocketManager.addSourceTarget(SourceOperateCodeEnum.Test_VOL.getValue(), sourceIssues.get(0)); } + xuMsg.setData(JSON.toJSONString(sourceIssues.get(0))); SocketManager.sendMsg(param.getUserPageId() + handlerSourceStr, MsgUtil.toJsonWithNewLinePlain(xuMsg)); + successComm.clear(); } } } @@ -269,21 +279,22 @@ public class SocketDevResponseService { break; case UNPROCESSED_BUSINESS: webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); + break; case DEV_ERROR: webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2)); - quitSend(param, socketMsg); - quitSendSource(param, socketMsg); + quitSend(param); + quitSendSource(param); break; case DEV_TARGET: webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2)); - quitSend(param, socketMsg); - quitSendSource(param, socketMsg); + quitSend(param); + quitSendSource(param); break; case RE_OPERATE: //出现已经初始化情况,发送用户用户确认是否继续检测 webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - quitSend(param, socketMsg); + quitSend(param); break; default: WebSocketVO webSocketVO = new WebSocketVO(); @@ -336,6 +347,7 @@ public class SocketDevResponseService { String userSource = param.getUserPageId() + handlerSourceStr; SocketMsg msg = new SocketMsg(); + msg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue()); msg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue()); Map map = new HashMap<>(1); @@ -345,7 +357,7 @@ public class SocketDevResponseService { //同时关闭设备三个步骤 SocketMsg quitDevMsg = new SocketMsg(); - quitSend(param, quitDevMsg); + quitSend(param); //向前端推送消息 SocketDataMsg temMsg = new SocketDataMsg(); @@ -363,6 +375,7 @@ public class SocketDevResponseService { } break; case UNPROCESSED_BUSINESS: + break; case NORMAL_RESPONSE: @@ -370,13 +383,13 @@ public class SocketDevResponseService { break; case DEV_ERROR: webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2)); - quitSend(param, socketMsg); - quitSendSource(param, socketMsg); + quitSend(param); + quitSendSource(param); break; case DEV_TARGET: webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2)); - quitSend(param, socketMsg); - quitSendSource(param, socketMsg); + quitSend(param); + quitSendSource(param); break; case RE_OPERATE: break; @@ -395,6 +408,8 @@ public class SocketDevResponseService { /** * 实时数据 */ + private final List devDataList = new ArrayList(); + public void realDeal(PreDetectionParam param, SocketDataMsg socketDataMsg) { String data = socketDataMsg.getData(); DevData devData = JSON.parseObject(data, DevData.class); @@ -403,17 +418,51 @@ public class SocketDevResponseService { SocketMsg socketMsg = new SocketMsg(); switch (dictDataEnumByCode) { case SUCCESS: - List sqlDataDTOList = devData.getSqlData(); - for (DevData.SqlDataDTO item : sqlDataDTOList) { + //List sqlDataDTOList = devData.getSqlData(); + devDataList.add(devData); + successComm.add(socketDataMsg.getRequestId()); + System.out.println("devData............."+devDataList.size()); + if (successComm.size() == monitorIdListComm.size()) { + SourceIssue sourceIssue = SocketManager.getSourceTarget(socketDataMsg.getRequestId()); + System.out.println("获取sourceIssue"+sourceIssue); + Map flag = detectionServiceImpl.text(devDataList, sourceIssue, DictDataEnum.AT_WILL_VALUE); + System.out.println(flag); + + WebSocketVO> webSocketVO = new WebSocketVO<>(); + webSocketVO.setRequestId(socketDataMsg.getRequestId().split("&&")[1]+"_End"); + + //组装实体推送给前台 + List devListRes = new ArrayList<>(); + devList.forEach(dev->{ + DevLineTestResult devLineTestResult = new DevLineTestResult(); + devLineTestResult.setDeviceId(dev.getDevId()); + devLineTestResult.setDeviceName(dev.getDevName()); + + List tt = new ArrayList<>(); + List monitorListDTOList = dev.getMonitorList(); + for(PreDetection.MonitorListDTO point : monitorListDTOList){ + Integer resultFlag = flag.get(dev.getDevIP()+"_"+point.getLine()); + tt.add(resultFlag); + } + devLineTestResult.setChnResult(tt.toArray(new Integer[monitorListDTOList.size()])); + devListRes.add(devLineTestResult); + }); + webSocketVO.setData(devListRes); + webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(webSocketVO)); + + quitSend(param); + quitSendSource(param); } - AdNonHarmonicResult adNonHarmonicResult = new AdNonHarmonicResult(); break; + case UNPROCESSED_BUSINESS: break; case NORMAL_RESPONSE: + devDataList.add(devData); + break; case DEV_ERROR: @@ -451,6 +500,7 @@ public class SocketDevResponseService { case QUIT_INIT_01: //关闭所有 SocketManager.removeUser(s); + quitSendSource(param); break; case QUIT_INIT_02: socketMsg.setRequestId("quit"); @@ -469,6 +519,8 @@ public class SocketDevResponseService { case NO_INIT_DEV: switch (operateCodeEnum) { case QUIT_INIT_01: + SocketManager.removeUser(s); + quitSendSource(param); break; case QUIT_INIT_02: socketMsg.setRequestId("quit"); @@ -493,7 +545,8 @@ public class SocketDevResponseService { /** * 退出检测 */ - private void quitSend(PreDetectionParam param, SocketMsg socketMsg) { + private void quitSend(PreDetectionParam param) { + SocketMsg socketMsg = new SocketMsg(); socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); SocketManager.sendMsg(param.getUserPageId() + handlerStr, JSON.toJSONString(socketMsg)); @@ -502,7 +555,8 @@ public class SocketDevResponseService { /** * 关闭源连接 */ - private void quitSendSource(PreDetectionParam param, SocketMsg socketMsg) { + private void quitSendSource(PreDetectionParam param) { + SocketMsg socketMsg = new SocketMsg(); socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue()); SocketManager.sendMsg(param.getUserPageId() + handlerSourceStr, JSON.toJSONString(socketMsg)); @@ -612,10 +666,12 @@ public class SocketDevResponseService { this.success = new ArrayList<>(); this.devDataMap = new HashMap<>(); - adNonHarmonicResultList.clear(); - + this.adNonHarmonicResultList.clear(); + this.devDataList.clear(); + this.devList.clear(); List pqDevList = iPqDevService.getDevInfo(param.getDevIds()); + this.devList = pqDevList; this.monitorIdListComm = pqDevList.stream().flatMap(x -> x.getMonitorList().stream()) .map(PreDetection.MonitorListDTO::getLineId) .collect(Collectors.toList()); diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java index 755c6a86..1aa1c5eb 100644 --- a/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java @@ -11,20 +11,14 @@ import lombok.NoArgsConstructor; @NoArgsConstructor public class WebSocketVO { - private String type; + private String type = "aaa"; - private Integer code; + private String requestId; - private String message; + private String operateCode; + + private String code; private T data; - - public WebSocketVO(SourceResponseCodeEnum sourceResponseCodeEnum,String type){ - this.type = type; - this.code = sourceResponseCodeEnum.getCode(); - this.message= sourceResponseCodeEnum.getMessage(); - } - - } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index e2249f13..eb74f653 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -24,6 +24,8 @@ import java.util.Objects; @RequiredArgsConstructor public class NettyDevClientHandler extends SimpleChannelInboundHandler { + private final String dev = "_Dev"; + private final PreDetectionParam param; private final SocketDevResponseService socketResponseService; @@ -36,12 +38,11 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端通道已建立" + ctx.channel().id()); - Channel channel = SocketManager.getChannelByUserId(param.getUserPageId()+"_Dev"); + Channel channel = SocketManager.getChannelByUserId(param.getUserPageId()+dev); if(Objects.nonNull(channel)){ channel.close().sync(); } - SocketManager.addUser(param.getUserPageId()+"_Dev",ctx.channel()); - + SocketManager.addUser(param.getUserPageId()+dev,ctx.channel()); } /** @@ -61,7 +62,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { */ @Override public void channelInactive(ChannelHandlerContext ctx) { - //System.out.println("客户端断线"); + System.out.println("客户端断线"); //SocketManager.addUser(webUser,ctx.channel()); } @@ -73,6 +74,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + System.out.println("进入超时。。。。。。"); //当连接超过10S和发送消息后10S无响应时候,关闭channel }