diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index 5b8f6179..8333b485 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -126,7 +126,7 @@ public class SocketDevResponseService { switch (Objects.requireNonNull(sourceOperateCodeEnum)) { //设备通讯校验 case YJC_SBTXJY: - devComm(socketDataMsg, param, msg); + devComm(socketDataMsg, param); break; //协议校验 case YJC_XYJY: @@ -165,6 +165,9 @@ public class SocketDevResponseService { break; case YXT: break; + default: + // todo... 要日志记录或者websocket送到前端友好提示用户 + break; } } @@ -700,7 +703,7 @@ public class SocketDevResponseService { /** * 装置通讯检测 */ - private void devComm(SocketDataMsg socketDataMsg, PreDetectionParam param, String msg) { + private void devComm(SocketDataMsg socketDataMsg, PreDetectionParam param) { SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); String s = param.getUserPageId() + CnSocketUtil.DEV_TAG; SocketMsg socketMsg = new SocketMsg<>(); @@ -710,7 +713,6 @@ public class SocketDevResponseService { successComm.add(result); //单个测点通讯成功 WebServiceManager.sendMsg(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, FormalTestManager.devNameMapComm, 1)); - System.out.println("设备通讯校验!" + successComm.size() + "=====" + FormalTestManager.monitorIdListComm.size()); if (successComm.size() == FormalTestManager.monitorIdListComm.size()) { // 通知前端整个装置通讯检测过程成功 @@ -752,11 +754,9 @@ public class SocketDevResponseService { case RE_OPERATE: //出现已经初始化情况,发送用户用户确认是否继续检测 WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - CnSocketUtil.quitSend(param); break; case NO_INIT_DEV: //发起关闭操作 - CnSocketUtil.quitSend(param); break; default: WebServiceManager.sendUnknownErrorMessage(param.getUserPageId()); @@ -791,7 +791,6 @@ public class SocketDevResponseService { String s = param.getUserPageId() + CnSocketUtil.DEV_TAG; switch (Objects.requireNonNull(dictDataEnumByCode)) { case SUCCESS: - if (socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.DEV_INIT_GATHER_02.getValue())) { successComm.add(socketDataMsg.getData()); WebServiceManager.sendMsg(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, FormalTestManager.devNameMapComm, 1)); @@ -839,12 +838,10 @@ public class SocketDevResponseService { System.out.println("开始脚本与icd校验:++++++++++"); SocketManager.sendMsg(s, JSON.toJSONString(socketMsg)); } - completeJudgment(param); } else if (socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.VERIFY_MAPPING$01.getValue())) { String data = socketDataMsg.getData(); IcdCheckData icdCheckData = JSON.parseObject(data, IcdCheckData.class); - boolean isContinue = true; for (int i = 0; i < icdCheckData.getResultData().size(); i++) { IcdCheckData.ResultData item = icdCheckData.getResultData().get(i); @@ -941,19 +938,6 @@ public class SocketDevResponseService { webSocketVO.setDesc(null); WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(webSocketVO)); } -// if (SourceOperateCodeEnum.TEST_TEM_START.getValue().equals(param.getOperateType())) { -// //暂停检测后的继续检测 -// System.out.println("进入暂停后的继续检测》》》》》》》》》》》》》》》》》》》》》》》》》》》" + "剩余检测小项" + SocketManager.getSourceList().size()); -// if (CollUtil.isNotEmpty(SocketManager.getSourceList())) { -// SourceIssue sourceIssue = SocketManager.getSourceList().get(0); -// socketMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue() + CnSocketUtil.STEP_TAG + sourceIssue.getType()); -// socketMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); -// socketMsg.setData(JSON.toJSONString(sourceIssue)); -// SocketManager.sendMsg(param.getUserPageId() + CnSocketUtil.SOURCE_TAG, JSON.toJSONString(socketMsg)); -// } -// } else { -// -// } } else { // 发送下一个脚本与icd校验 String icdType = icdTypeList.stream().filter(it -> !icdCheckDataMap.containsKey(it)).findFirst().orElse(null); @@ -995,6 +979,7 @@ public class SocketDevResponseService { break; default: WebServiceManager.sendUnknownErrorMessage(param.getUserPageId()); + // todo... 这种情况是报文的状态码不一致,需要记录到日志表,以便问题追踪 break; } } @@ -1020,11 +1005,13 @@ public class SocketDevResponseService { } } else { if (ObjectUtil.isNotNull(list.getA()) && list.getA().equals(1.0) && ObjectUtil.isNotNull(list.getB()) && list.getB().equals(1.0) && ObjectUtil.isNotNull(list.getC()) && list.getC().equals(1.0) || ObjectUtil.isNotNull(list.getT()) && list.getT().equals(1.0)) { - return 1; // 装置上送错误 + // 装置上送错误 + return 1; } } } - return 0; // icd文件与脚本不匹配 + // icd文件与脚本不匹配 + return 0; } diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java index 9145eb46..d4b33f14 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java @@ -35,12 +35,8 @@ public class SocketSourceResponseService { */ private final SocketDevResponseService socketDevResponseService; private final IPqDevService iPqDevService; + private final SocketManager socketManager; - @Value("${socket.device.ip}") - private String ip; - - @Value("${socket.device.port}") - private Integer port; private List devList = new ArrayList<>(); private List monitorIdList = new ArrayList<>(); @@ -82,8 +78,12 @@ public class SocketSourceResponseService { break; case YXT: break; + default: + // todo... 要日志记录或者websocket送到前端友好提示用户 + break; } } else { + // todo... 要日志记录或者websocket送到前端友好提示用户 System.out.println("fggggggggggggggggggggg" + enumByCode); } } @@ -219,14 +219,11 @@ public class SocketSourceResponseService { SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); if (ObjectUtil.isNotNull(dictDataEnumByCode)) { SocketMsg socketMsg = new SocketMsg<>(); - switch (dictDataEnumByCode) { case SUCCESS: //todo 前端推送收到的消息暂未处理好 WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - String s = param.getUserPageId() + CnSocketUtil.DEV_TAG; //开始设备通讯检测(发送设备初始化) - //List devList = iPqDevService.getDevInfo(param.getDevIds()); Map> map = new HashMap<>(1); map.put("deviceList", FormalTestManager.devList); String jsonString = JSON.toJSONString(map); @@ -234,8 +231,8 @@ public class SocketSourceResponseService { socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue()); socketMsg.setData(jsonString); String json = JSON.toJSONString(socketMsg); - // SocketManager.sendMsg(s,json); - NettyClient.socketClient(ip, port, param, json, new NettyDevClientHandler(param, socketDevResponseService)); + // 使用智能发送工具类,自动管理设备连接 + socketManager.smartSendToDevice(param, json); break; case UNPROCESSED_BUSINESS: WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); @@ -273,9 +270,12 @@ public class SocketSourceResponseService { WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); break; default: + // todo... 这种情况是报文的状态码不一致,需要记录到日志表,以便问题追踪 WebServiceManager.sendUnknownErrorMessage(param.getUserPageId()); break; } + }else{ + // todo... 这种情况是报文的状态码不一致,需要记录到日志表,以便问题追踪 } } diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 55eb7620..8d2f3b9c 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -21,10 +21,8 @@ import com.njcn.gather.detection.util.business.DetectionCommunicateUtil; import com.njcn.gather.detection.util.socket.CnSocketUtil; import com.njcn.gather.detection.util.socket.FormalTestManager; import com.njcn.gather.detection.util.socket.SocketManager; -import com.njcn.gather.detection.util.socket.cilent.NettyClient; -import com.njcn.gather.detection.util.socket.cilent.NettyContrastClientHandler; -import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; +import com.njcn.web.utils.RequestUtil; import com.njcn.gather.device.pojo.po.PqDev; import com.njcn.gather.device.pojo.vo.PreDetection; import com.njcn.gather.device.service.IPqDevService; @@ -41,11 +39,9 @@ import com.njcn.gather.source.pojo.po.SourceInitialize; import com.njcn.gather.source.service.IPqSourceService; import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum; import com.njcn.gather.system.dictionary.service.IDictDataService; -import com.njcn.web.utils.RequestUtil; import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.math.BigDecimal; @@ -72,14 +68,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { private final SocketSourceResponseService socketSourceResponseService; private final SocketContrastResponseService socketContrastResponseService; private final IPqScriptCheckDataService iPqScriptCheckDataService; - - @Value("${socket.device.ip:127.0.0.1}") - private String ip; - - @Value("${socket.device.port:61000}") - private Integer port; - - //private final SocketSourceResponseService sourceResponseService; + private final SocketManager socketManager; @Override @@ -119,6 +108,10 @@ public class PreDetectionServiceImpl implements PreDetectionService { } + /** + * 原本系数校准单独写的,现在合并了,该方法过期了,没有调用了 + */ + @Deprecated @Override public void coefficientCheck(PreDetectionParam param) { // 检测是否存在连接的通道,后期需要做成动态,如果组合中不是第一位,则不需要关闭,也不用初始化 todo.... @@ -137,8 +130,8 @@ public class PreDetectionServiceImpl implements PreDetectionService { msg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue()); msg.setData(JSON.toJSONString(sourceParam)); param.setSourceId(sourceParam.getSourceId()); -// NettyClient.socketClient(ip, port, param, JSON.toJSONString(msg), new NettySourceClientHandler(param, sourceResponseService)); - NettyClient.socketClient(ip, port, param, JSON.toJSONString(msg), new NettySourceClientHandler(param, socketSourceResponseService)); + // 使用智能发送工具类,自动管理连接 + socketManager.smartSendToSource(param, JSON.toJSONString(msg)); } else { throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT); } @@ -182,8 +175,8 @@ public class PreDetectionServiceImpl implements PreDetectionService { socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue()); socketMsg.setData(JSON.toJSONString(sourceParam)); - //建立与源控程序的socket连接, - NettyClient.socketClient(ip, port, param, JSON.toJSONString(socketMsg), new NettySourceClientHandler(param, socketSourceResponseService)); + //使用智能发送工具类,自动管理与源控程序的socket连接 + socketManager.smartSendToSource(param, JSON.toJSONString(socketMsg)); } else { throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT); } @@ -218,7 +211,8 @@ public class PreDetectionServiceImpl implements PreDetectionService { socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue()); socketMsg.setData(JSON.toJSONString(sourceParam)); - NettyClient.socketClient(ip, port, param, JSON.toJSONString(socketMsg), new NettySourceClientHandler(param, socketSourceResponseService)); + // 使用智能发送工具类,自动管理连接 + socketManager.smartSendToSource(param, JSON.toJSONString(socketMsg)); } else { throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT); } @@ -353,17 +347,14 @@ public class PreDetectionServiceImpl implements PreDetectionService { }); map.put("deviceList", preDetections); String jsonString = JSON.toJSONString(map); - SocketMsg socketMsg = new SocketMsg<>(); socketMsg.setRequestId(SourceOperateCodeEnum.YJC_SBTXJY.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_02.getValue()); socketMsg.setData(jsonString); - String json = JSON.toJSONString(socketMsg); - PreDetectionParam preDetectionParam = new PreDetectionParam(); preDetectionParam.setUserPageId(param.getLoginName()); WebServiceManager.addPreDetectionParam(param.getLoginName(), preDetectionParam); - NettyClient.socketClient(ip, port, preDetectionParam, json, new NettyContrastClientHandler(preDetectionParam, socketContrastResponseService)); + socketManager.smartSendToContrast(param, JSON.toJSONString(socketMsg)); } /** diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java index d6e4f420..12e1d94e 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketMsg; +import com.njcn.gather.detection.pojo.vo.WebSocketVO; import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; /** @@ -36,7 +37,6 @@ public class CnSocketUtil { socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); SocketManager.sendMsg(param.getUserPageId() + DEV_TAG, JSON.toJSONString(socketMsg)); - WebServiceManager.removePreDetectionParam(); } @@ -55,7 +55,6 @@ public class CnSocketUtil { } - /** * 比对式-退出检测 */ diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java index a233f68c..0a0dcb1d 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java @@ -1,24 +1,51 @@ package com.njcn.gather.detection.util.socket; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.njcn.gather.detection.pojo.param.ContrastDetectionParam; +import com.njcn.gather.detection.pojo.param.PreDetectionParam; +import com.njcn.gather.detection.util.socket.cilent.NettyClient; +import com.njcn.gather.detection.util.socket.config.SocketConnectionConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + import com.njcn.gather.plan.pojo.enums.DataSourceEnum; import com.njcn.gather.script.pojo.po.SourceIssue; import io.netty.channel.Channel; import io.netty.channel.nio.NioEventLoopGroup; +import lombok.extern.slf4j.Slf4j; +import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; /** + * Socket连接管理器 + * 提供Socket连接的生命周期管理、消息发送、检测任务管理等功能 + * + * 包含以下主要功能: + * 1. 基础连接管理:addUser, removeUser, sendMsg等 + * 2. 智能消息发送:smartSendToSource, smartSendToDevice等(新增) + * 3. 检测任务管理:targetMap, sourceIssueList等管理 + * 4. 连接状态监控:getConnectionStatus等(新增) + * * @Description: webSocket存储的通道 - * @Author: wr + * @Author: wr, hongawen * @Date: 2024/12/11 13:04 */ +@Slf4j +@Component public class SocketManager { + @Resource + private SocketConnectionConfig socketConnectionConfig; + /** * key为userId(xxx_Source、xxx_Dev),value为channel */ @@ -46,12 +73,13 @@ public class SocketManager { e.printStackTrace(); } NioEventLoopGroup eventExecutors = socketGroup.get(userId); - if(ObjectUtil.isNotNull(channel)){ + if(ObjectUtil.isNotNull(eventExecutors)){ eventExecutors.shutdownGracefully(); System.out.println(userId+"__"+channel.id()+"关闭了客户端"); } } socketSessions.remove(userId); + socketGroup.remove(userId); } public static Channel getChannelByUserId(String userId) { @@ -66,11 +94,161 @@ public class SocketManager { Channel channel = socketSessions.get(userId); if(ObjectUtil.isNotNull(channel)){ channel.writeAndFlush(msg+'\n'); - System.out.println(userId+"__"+channel.id()+"往"+channel.remoteAddress()+"发送数据:"+msg); + log.info("{}__{}往{}发送数据:{}", userId, channel.id(), channel.remoteAddress(), msg); }else{ - System.out.println(userId+"__发送数据:失败通道不存在"+msg); + log.warn("{}__发送数据:失败通道不存在{}", userId, msg); + } + } + + // =================== 智能发送功能 =================== + + /** + * 智能发送消息到程控源设备 + * 自动从配置文件读取IP和PORT,开发者无需关心网络配置 + * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送 + * + * @param param 检测参数,包含用户页面ID等信息 + * @param msg 要发送的消息内容(JSON格式,包含requestId字段) + */ + public void smartSendToSource(PreDetectionParam param, String msg) { + String requestId = extractRequestId(msg); + String userId = param.getUserPageId() + CnSocketUtil.SOURCE_TAG; + // 检查是否需要建立连接 + if (SocketConnectionConfig.needsSourceConnection(requestId)) { + String ip = socketConnectionConfig.getSource().getIp(); + Integer port = socketConnectionConfig.getSource().getPort(); + // 检查连接是否存在且活跃 + if (!isChannelActive(userId)) { + log.info("程控源连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId); + // 异步建立程控源连接并发送消息 + CompletableFuture.runAsync(() -> { + NettyClient.connectToSourceStatic(ip, port, param, msg); + }); + return; + } + } + + // 连接已存在或不需要建立连接,直接发送消息 + log.info("直接发送消息到程控源: userId={}, requestId={}", userId, requestId); + sendMsg(userId, msg); + } + + /** + * 智能发送消息到被检设备 + * 自动从配置文件读取IP和PORT,开发者无需关心网络配置 + * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送 + * + * @param param 检测参数,包含用户页面ID等信息 + * @param msg 要发送的消息内容(JSON格式,包含requestId字段) + */ + public void smartSendToDevice(PreDetectionParam param, String msg) { + String requestId = extractRequestId(msg); + String userId = param.getUserPageId() + CnSocketUtil.DEV_TAG; + // 检查是否需要建立连接 + if (SocketConnectionConfig.needsDeviceConnection(requestId)) { + String ip = socketConnectionConfig.getDevice().getIp(); + Integer port = socketConnectionConfig.getDevice().getPort(); + // 检查连接是否存在且活跃 + if (!isChannelActive(userId)) { + log.info("被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId); + // 异步建立被检设备连接并发送消息 + CompletableFuture.runAsync(() -> { + NettyClient.connectToDeviceStatic(ip, port, param, msg); + }); + return; + } + } + + // 连接已存在或不需要建立连接,直接发送消息 + log.info("直接发送消息到被检设备: userId={}, requestId={}", userId, requestId); + sendMsg(userId, msg); + } + + + /** + * 比对智能发送消息到被检设备 + * 自动从配置文件读取IP和PORT,开发者无需关心网络配置 + * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送 + * + * @param param 检测参数,包含用户页面ID等信息 + * @param msg 要发送的消息内容(JSON格式,包含requestId字段) + */ + public void smartSendToContrast(ContrastDetectionParam param, String msg) { + String requestId = extractRequestId(msg); + String userId = param.getLoginName() + CnSocketUtil.CONTRAST_DEV_TAG; + // 检查是否需要建立连接 + if (SocketConnectionConfig.needsDeviceConnection(requestId)) { + String ip = socketConnectionConfig.getDevice().getIp(); + Integer port = socketConnectionConfig.getDevice().getPort(); + // 检查连接是否存在且活跃 + if (!isChannelActive(userId)) { + log.info("比对被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId); + // 异步建立比对被检设备连接并发送消息 + CompletableFuture.runAsync(() -> { + NettyClient.connectToContrastDeviceStatic(ip, port, param, msg); + }); + return; + } } + // 连接已存在或不需要建立连接,直接发送消息 + log.info("直接发送消息到比对被检设备: userId={}, requestId={}", userId, requestId); + sendMsg(userId, msg); + } + + + + + // =================== 私有工具方法 =================== + + /** + * 从消息中提取requestId + * 支持JSON格式的消息解析 + * + * @param msg 消息内容 + * @return String requestId,如果解析失败返回"unknown" + */ + private static String extractRequestId(String msg) { + try { + if (StrUtil.isNotBlank(msg)) { + // 尝试解析JSON格式消息 + JSONObject jsonObject = JSON.parseObject(msg); + String requestId = jsonObject.getString("requestId"); + if (StrUtil.isNotBlank(requestId)) { + return requestId; + } + + // 如果没有requestId字段,尝试解析request_id字段 + requestId = jsonObject.getString("request_id"); + if (StrUtil.isNotBlank(requestId)) { + return requestId; + } + + // 如果没有JSON字段,尝试从普通字符串中匹配 + if (msg.contains("requestId=")) { + String[] parts = msg.split("requestId="); + if (parts.length > 1) { + String idPart = parts[1].split("[,\\s&]")[0]; + return idPart.trim(); + } + } + } + } catch (Exception e) { + log.warn("解析消息中的requestId失败: msg={}, error={}", msg, e.getMessage()); + } + + return "unknown"; + } + + /** + * 检查指定用户的Channel是否活跃 + * + * @param userId 用户ID + * @return boolean true:连接活跃, false:连接不存在或不活跃 + */ + private static boolean isChannelActive(String userId) { + Channel channel = getChannelByUserId(userId); + return ObjectUtil.isNotNull(channel) && channel.isActive(); } @@ -134,9 +312,5 @@ public class SocketManager { - - - - } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/UnitUtil.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/UnitUtil.java deleted file mode 100644 index c1e2b22f..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/UnitUtil.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.njcn.gather.detection.util.socket; - -import com.njcn.gather.detection.pojo.enums.DetectionCodeEnum; - -import java.util.Arrays; - -/** - * @author wr - * @description - * @date 2025/3/27 14:58 - */ -public class UnitUtil { -// public static String unit(String code, Integer fly) { -// String unit = ""; -// if (Arrays.asList(0, 1).contains(fly)) { -// if (DetectionCodeEnum.FREQ.getCode().equals(code)) { -// unit = "Hz"; -// } -// if (DetectionCodeEnum.VRMS.getCode().equals(code)) { -// unit = "V"; -// } -// if (DetectionCodeEnum.IRMS.getCode().equals(code)) { -// unit = "A"; -// } -// if (DetectionCodeEnum.V2_50.getCode().equals(code) || -// DetectionCodeEnum.SV_1_49.getCode().equals(code)|| -// DetectionCodeEnum.V_UNBAN.getCode().equals(code) || -// DetectionCodeEnum.I_UNBAN.getCode().equals(code) -// ) { -// unit = "%"; -// } -// if (DetectionCodeEnum.I2_50.getCode().equals(code) || -// DetectionCodeEnum.SI_1_49.getCode().equals(code) -// ) { -// unit = "A"; -// } -// if (DetectionCodeEnum.P2_50.getCode().equals(code)) { -// unit = "W"; -// } -// if (DetectionCodeEnum.P.getCode().equals(code)) { -// unit = "P"; -// } -// if (DetectionCodeEnum.MAG.getCode().equals(code)) { -// unit = "V"; -// } -// if (DetectionCodeEnum.DUR.getCode().equals(code)) { -// unit = "s"; -// } -// if (DetectionCodeEnum.VA.getCode().equals(code) || -// DetectionCodeEnum.IA.getCode().equals(code) -// ) { -// unit = "°"; -// } -// if (DetectionCodeEnum.DELTA_V.getCode().equals(code) -// ) { -// unit = "%"; -// } -// }else{ -// unit = "%"; -// } -// return unit; -// } -} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/HeartbeatHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/HeartbeatHandler.java index ebba8df3..8e41e5b4 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/HeartbeatHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/HeartbeatHandler.java @@ -11,51 +11,139 @@ import com.njcn.gather.detection.util.socket.SocketManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; + import java.time.LocalDateTime; -import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** - * @Author: cdf - * @CreateTime: 2025-02-11 - * @Description: 心跳处理类 + * Netty心跳处理器 + *

+ * 负责维护Socket长连接的心跳检测机制,通过定期发送心跳包来检测连接状态, + * 当连续多次未收到心跳响应时自动断开连接并清理相关资源。 + *

+ * + *

核心功能:

+ *
    + *
  • 定时发送心跳包 (默认10秒间隔,3秒后开始)
  • + *
  • 监听心跳响应,重置失败计数器
  • + *
  • 连续失败超过阈值时触发断开逻辑 (默认3次)
  • + *
  • 异步处理断开操作,避免阻塞心跳线程
  • + *
  • 优雅关闭资源,防止内存泄漏
  • + *
+ * + *

心跳机制流程:

+ *
+ * 连接建立 → 启动心跳定时任务(3秒后开始,每10秒执行)
+ *           ↓
+ * 发送心跳包 → 等待响应 → 收到响应(重置计数器) / 未收到响应(递增计数器)
+ *           ↓
+ * 连续3次失败 → 异步执行断开逻辑 → 发送退出指令 → 延迟清理连接
+ *           ↓
+ * 连接断开 → 优雅关闭定时任务和线程池
+ * 
+ * + *

线程安全设计:

+ *

+ * 使用单线程的ScheduledExecutorService处理心跳发送,避免并发问题。 + * 超时处理使用CompletableFuture异步执行,不阻塞心跳发送线程。 + * Future引用使用volatile修饰,确保多线程环境下的可见性。 + *

+ * + *

设备类型支持:

+ *
    + *
  • 程控源设备 (CnSocketUtil.SOURCE_TAG): "_Source"
  • + *
  • 被检设备 (CnSocketUtil.DEV_TAG): "_Dev"
  • + *
+ * + *

使用示例:

+ *
{@code
+ * // 创建心跳处理器
+ * HeartbeatHandler handler = new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG);
+ * 
+ * // 添加到Netty管道中
+ * pipeline.addLast(handler);
+ * }
+ * + * @author cdf + * @since 2025-02-11 + * @version 1.2 */ - +@Slf4j public class HeartbeatHandler extends SimpleChannelInboundHandler { + + /** 心跳定时任务执行器,使用单线程池避免并发问题 */ private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1); + + /** 检测参数,包含用户页面ID等信息 */ private final PreDetectionParam param; + + /** 处理器类型标识("_Source" 或 "_Dev") */ private final String handlerType; + + /** 保存定时任务的Future引用,便于取消和管理 */ + private ScheduledFuture heartbeatFuture; - // 允许连续未收到心跳响应的最大次数 + /** 允许连续未收到心跳响应的最大次数 */ private static final int MAX_HEARTBEAT_MISSES = 3; - // 连续未收到心跳响应的次数 + + /** 连续未收到心跳响应的次数 */ private int consecutiveHeartbeatMisses = 0; + /** + * 构造函数 + * + * @param param 检测参数,包含用户页面ID等信息 + * @param type 处理器类型(CnSocketUtil.SOURCE_TAG 或 CnSocketUtil.DEV_TAG) + */ public HeartbeatHandler(PreDetectionParam param, String type) { this.param = param; this.handlerType = type; } + /** + * 通道激活时的回调方法 + * 在Socket连接建立成功后被调用,启动心跳机制 + * + * @param ctx Netty的通道上下文对象 + */ @Override public void channelActive(ChannelHandlerContext ctx) { + log.info("心跳处理器启动 - 设备类型: {}", handlerType); // 启动心跳定时任务 scheduleHeartbeat(ctx); + // 传播事件给管道中的后续处理器 ctx.fireChannelActive(); } + /** + * 通道断开时的回调方法 + * 在Socket连接断开时被调用,负责清理相关资源 + * + * @param ctx Netty的通道上下文对象 + * @throws Exception 异常情况 + */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - heartbeatExecutor.shutdown(); + log.info("心跳处理器开始清理资源 - 设备类型: {}", handlerType); + shutdownExecutorGracefully(); super.channelInactive(ctx); } - // 每10秒发送一次心跳 + /** + * 启动心跳定时任务 + * 每10秒发送一次心跳包,3秒后开始执行 + * + * @param ctx Netty的通道上下文对象 + */ private void scheduleHeartbeat(ChannelHandlerContext ctx) { - heartbeatExecutor.scheduleAtFixedRate(() -> { + heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(() -> { if (ctx.channel().isActive()) { // 发送心跳包 SocketMsg msg = new SocketMsg<>(); @@ -64,50 +152,148 @@ public class HeartbeatHandler extends SimpleChannelInboundHandler { msg.setData(""); ctx.channel().writeAndFlush(JSON.toJSONString(msg) + "\n"); - System.out.println(handlerType + "♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥send" + LocalDateTime.now()); + log.debug("心跳发送 - 设备类型: {}, 时间: {}", handlerType, LocalDateTime.now()); consecutiveHeartbeatMisses++; if (consecutiveHeartbeatMisses >= MAX_HEARTBEAT_MISSES) { - // 连续三次未收到心跳响应,断开连接 - System.out.println(handlerType + "连续三次未收到心跳响应,断开连接"); - if (CnSocketUtil.DEV_TAG.equals(handlerType)) { - //CnSocketUtil.sendToWebSocket(param.getUserPageId(),); - CnSocketUtil.quitSend(param); - } else if (CnSocketUtil.CONTRAST_DEV_TAG.equals(handlerType)) { - CnSocketUtil.contrastSendquit(param.getUserPageId()); - } else { - CnSocketUtil.quitSendSource(param); - } - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - System.err.println("线程中断异常: " + e.getMessage()); - } - String key = param.getUserPageId() + handlerType; - SocketManager.removeUser(key); + // 连续三次未收到心跳响应,异步处理断开逻辑,避免阻塞心跳线程 + log.warn("心跳响应超时 - 设备类型: {}, 连续失败次数: {}/{}, 执行断开连接", + handlerType, consecutiveHeartbeatMisses, MAX_HEARTBEAT_MISSES); + handleHeartbeatTimeoutAsync(); consecutiveHeartbeatMisses = 0; // 重置连续心跳丢失次数 } } }, 3, 10, TimeUnit.SECONDS); } + /** + * 异步处理心跳超时断开逻辑 + *

+ * 使用CompletableFuture避免阻塞心跳发送线程,确保心跳机制不受影响。 + * 处理流程: + * 1. 异步发送退出指令 + * 2. 延迟3秒后清理Socket连接 + * 3. 记录处理过程和异常 + *

+ */ + private void handleHeartbeatTimeoutAsync() { + CompletableFuture.runAsync(() -> { + try { + log.info("开始执行心跳超时断开处理 - 设备类型: {}", handlerType); + // 根据设备类型发送对应的退出指令 + if (CnSocketUtil.DEV_TAG.equals(handlerType)) { + CnSocketUtil.quitSend(param); + } else { + CnSocketUtil.quitSendSource(param); + } + log.debug("退出指令已发送,等待3秒后清理连接 - 设备类型: {}", handlerType); + } catch (Exception e) { + log.error("心跳超时处理发送退出指令异常 - 设备类型: {}", handlerType, e); + } + }).thenRunAsync(() -> { + try { + // 延迟3秒后清理连接,给退出指令留出处理时间 + Thread.sleep(3000); + // 构建连接Key并从SocketManager中移除 + String key = CnSocketUtil.DEV_TAG.equals(handlerType) ? + param.getUserPageId() + CnSocketUtil.DEV_TAG : + param.getUserPageId() + CnSocketUtil.SOURCE_TAG; + SocketManager.removeUser(key); + log.info("心跳超时断开处理完成 - 设备类型: {}, 连接已清理", handlerType); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("心跳超时处理等待过程中被中断 - 设备类型: {}", handlerType); + } catch (Exception e) { + log.error("心跳超时处理清理连接异常 - 设备类型: {}", handlerType, e); + } + }); + } + /** + * 优雅关闭线程池执行器 + *

+ * 确保资源的完全清理,避免内存泄漏: + * 1. 取消当前的心跳定时任务 + * 2. 关闭线程池并等待正在执行的任务完成 + * 3. 如果等待超时则强制关闭 + * 4. 处理中断异常并恢复中断状态 + *

+ */ + private void shutdownExecutorGracefully() { + try { + // 1. 取消心跳定时任务 + if (heartbeatFuture != null && !heartbeatFuture.isCancelled()) { + boolean cancelled = heartbeatFuture.cancel(false); + log.debug("心跳定时任务取消结果: {} - 设备类型: {}", cancelled, handlerType); + } + // 2. 关闭线程池,不再接收新任务 + heartbeatExecutor.shutdown(); + // 3. 等待已提交的任务完成,最多等待5秒 + if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("心跳线程池未能在5秒内正常关闭,执行强制关闭 - 设备类型: {}", handlerType); + heartbeatExecutor.shutdownNow(); + // 再次等待强制关闭完成,最多等待2秒 + if (!heartbeatExecutor.awaitTermination(2, TimeUnit.SECONDS)) { + log.error("心跳线程池强制关闭失败 - 设备类型: {}", handlerType); + } + } else { + log.debug("心跳线程池已优雅关闭 - 设备类型: {}", handlerType); + } + } catch (InterruptedException e) { + // 如果等待过程中被中断,立即强制关闭 + log.warn("心跳线程池关闭过程中被中断,执行强制关闭 - 设备类型: {}", handlerType); + heartbeatExecutor.shutdownNow(); + // 恢复中断状态,遵循Java并发编程最佳实践 + Thread.currentThread().interrupt(); + } + } + + /** + * 消息接收处理方法 + *

+ * 负责处理从服务端接收到的消息: + * 1. 过滤心跳响应包,重置失败计数器 + * 2. 业务消息传递给后续处理器 + *

+ * + * @param ctx Netty的通道上下文对象 + * @param msg 接收到的消息内容 + * @throws Exception 异常情况 + */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { // 过滤心跳包,避免进入业务逻辑 if (isHeartbeatPacket(msg)) { - System.out.println(handlerType + "♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥response" + LocalDateTime.now()); + log.debug("心跳响应 - 设备类型: {}, 时间: {}", handlerType, LocalDateTime.now()); + // 重置连续失败计数器,表示连接正常 consecutiveHeartbeatMisses = 0; return; } - // 处理业务数据 + // 业务消息传递给管道中的后续处理器 ctx.fireChannelRead(msg); - } + /** + * 判断是否为心跳数据包 + *

+ * 通过解析消息的operateCode字段来判断是否为心跳响应。 + * 心跳包的操作码为SourceOperateCodeEnum.HEARTBEAT。 + *

+ * + * @param msg 需要判断的消息内容 + * @return true:心跳包, false:业务消息 + */ private boolean isHeartbeatPacket(String msg) { - // 判断是否为心跳包 - SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); - return !Objects.isNull(socketDataMsg.getOperateCode()) && socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.HEARTBEAT.getValue()); + try { + // 解析消息为SocketDataMsg对象 + SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); + // 检查操作码是否为心跳类型 + return socketDataMsg != null && + socketDataMsg.getOperateCode() != null && + socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.HEARTBEAT.getValue()); + } catch (Exception e) { + // 消息解析失败,可能不是标准格式的心跳包 + log.debug("消息解析失败,可能不是心跳包: {}", msg, e); + return false; + } } } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index 424d9a30..9954f6f1 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -1,9 +1,12 @@ package com.njcn.gather.detection.util.socket.cilent; import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; +import com.njcn.gather.detection.handler.SocketContrastResponseService; +import com.njcn.gather.detection.handler.SocketDevResponseService; +import com.njcn.gather.detection.handler.SocketSourceResponseService; import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; +import com.njcn.gather.detection.pojo.param.ContrastDetectionParam; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketDataMsg; import com.njcn.gather.detection.util.socket.CnSocketUtil; @@ -18,158 +21,484 @@ import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.CharsetUtil; -import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** + * Netty客户端工具类 + * 用于建立与检测设备和程控源设备的Socket通信连接 + * 支持心跳检测、断线重连和异常处理 + * * @Description: 心跳检测服务端 对应的服务端在netty-server 包下的NettyClient * @Author: wr * @Date: 2024/12/10 14:16 */ -@Getter +@Slf4j +@Component public class NettyClient { - public static void socketClient(String ip, Integer port, PreDetectionParam param, String msg, SimpleChannelInboundHandler handler) { - NioEventLoopGroup group = new NioEventLoopGroup(); - Bootstrap bootstrap = new Bootstrap(); - try { - bootstrap.group(group) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(NioSocketChannel ch) { - if (handler instanceof NettySourceClientHandler) { - ch.pipeline() - //空闲状态的handler - // 添加LineBasedFrameDecoder来按行分割数据 - .addLast(new LineBasedFrameDecoder(10240)) - // .addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS)) - .addLast(new StringDecoder(CharsetUtil.UTF_8)) - .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG)) - .addLast(handler); - } else if (handler instanceof NettyDevClientHandler) { - ch.pipeline() - // 添加LineBasedFrameDecoder来按行分割数据 - .addLast(new LineBasedFrameDecoder(10240)) - .addLast(new StringDecoder(CharsetUtil.UTF_8)) - .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new HeartbeatHandler(param, CnSocketUtil.DEV_TAG)) - //空闲状态的handler - //readerIdleTimeSeconds:在指定的秒数内如果没有读取到任何数据,则触发IdleState.READER_IDLE事件。 - //writerIdleTimeSeconds:在指定的秒数内如果没有写入任何数据,则触发IdleState.WRITER_IDLE事件。 - //allIdleTimeSeconds:在指定的秒数内如果没有发生任何读取或写入操作,则触发IdleState.ALL_IDLE事件。 - .addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)) - .addLast(handler); - } else if (handler instanceof NettyContrastClientHandler) { - ch.pipeline() + @Resource + private SocketSourceResponseService socketSourceResponseService; - // 添加LineBasedFrameDecoder来按行分割数据 - .addLast(new LineBasedFrameDecoder(10240)) - .addLast(new StringDecoder(CharsetUtil.UTF_8)) - .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new HeartbeatHandler(param, CnSocketUtil.CONTRAST_DEV_TAG)) - //空闲状态的handler - //readerIdleTimeSeconds:在指定的秒数内如果没有读取到任何数据,则触发IdleState.READER_IDLE事件。 - //writerIdleTimeSeconds:在指定的秒数内如果没有写入任何数据,则触发IdleState.WRITER_IDLE事件。 - //allIdleTimeSeconds:在指定的秒数内如果没有发生任何读取或写入操作,则触发IdleState.ALL_IDLE事件。 - .addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)) - .addLast(handler); - } - } - }); - ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); - channelFuture.addListener((ChannelFutureListener) ch -> { - if (!ch.isSuccess()) { - System.out.println("链接服务端失败..."); - // 连接失败时关闭 group - group.shutdownGracefully(); - } else { - System.out.println("链接服务端成功..."); - if (handler instanceof NettySourceClientHandler) { - NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + CnSocketUtil.SOURCE_TAG); - if (ObjectUtil.isNotNull(groupByUserId)) { - groupByUserId.shutdownGracefully().sync(); - } - SocketManager.addGroup(param.getUserPageId() + CnSocketUtil.SOURCE_TAG, group); - } else if(handler instanceof NettyDevClientHandler){ - NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + CnSocketUtil.DEV_TAG); - if (ObjectUtil.isNotNull(groupByUserId)) { - groupByUserId.shutdownGracefully().sync(); - } - SocketManager.addGroup(param.getUserPageId() + CnSocketUtil.DEV_TAG, group); - }else if(handler instanceof NettyContrastClientHandler){ - NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + CnSocketUtil.CONTRAST_DEV_TAG); - if (ObjectUtil.isNotNull(groupByUserId)) { - groupByUserId.shutdownGracefully().sync(); - } - SocketManager.addGroup(param.getUserPageId() + CnSocketUtil.CONTRAST_DEV_TAG, group); - } + @Resource + private SocketDevResponseService socketDevResponseService; - System.out.println("客户端向服务端发送消息:" + port + msg); - channelFuture.channel().writeAndFlush(msg + "\n"); - } - }); - } catch (Exception e) { - System.out.println("连接socket服务端发送异常............" + e.getMessage()); - group.shutdownGracefully(); - //TODO 通知页面 - SocketDataMsg socketDataMsg = new SocketDataMsg(); - socketDataMsg.setType("aaa"); - socketDataMsg.setCode(SourceResponseCodeEnum.SOCKET_ERROR.getCode()); - socketDataMsg.setData(SourceResponseCodeEnum.SOCKET_ERROR.getMessage()); - socketDataMsg.setRequestId("connect"); - if (handler instanceof NettySourceClientHandler) { - socketDataMsg.setOperateCode("Source"); - } else if (handler instanceof NettyDevClientHandler) { - CnSocketUtil.quitSendSource(param); - socketDataMsg.setOperateCode("Dev"); - } else { - socketDataMsg.setOperateCode("Contrast_Dev"); - CnSocketUtil.contrastSendquit(param.getUserPageId()); - } - WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - } finally { - // System.out.println("进入clientSocket最后步骤---------------------"); + @Resource + private SocketContrastResponseService socketContrastResponseService; + + /** + * 静态实例,用于保持向后兼容 + */ + private static NettyClient instance; + + + @PostConstruct + public void init() { + instance = this; + } + + /** + * 创建程控源Handler实例(Spring管理方式) + * 自动注入SocketSourceResponseService,统一使用CnSocketUtil.SOURCE_TAG + * + * @param param 检测参数 + * @return NettySourceClientHandler 程控源处理器实例 + */ + public NettySourceClientHandler createSourceHandler(PreDetectionParam param) { + return new NettySourceClientHandler(param, socketSourceResponseService); + } + + /** + * 创建被检设备Handler实例(Spring管理方式) + * 自动注入SocketDevResponseService + * + * @param param 检测参数 + * @return NettyDevClientHandler 被检设备处理器实例 + */ + public NettyDevClientHandler createDeviceHandler(PreDetectionParam param) { + return new NettyDevClientHandler(param, socketDevResponseService); + } + + /** + * 智能连接程控源设备(新增方法) + * 自动创建Handler并建立连接 + * + * @param ip 程控源IP地址 + * @param port 程控源端口 + * @param param 检测参数 + * @param msg 初始消息 + */ + public void connectToSource(String ip, Integer port, PreDetectionParam param, String msg) { + NettySourceClientHandler handler = createSourceHandler(param); + executeSocketConnection(ip, port, param, msg, handler); + } + + /** + * 智能连接被检设备(新增方法) + * 自动创建Handler并建立连接 + * + * @param ip 被检设备IP地址 + * @param port 被检设备端口 + * @param param 检测参数 + * @param msg 初始消息 + */ + public void connectToDevice(String ip, Integer port, PreDetectionParam param, String msg) { + NettyDevClientHandler handler = createDeviceHandler(param); + executeSocketConnection(ip, port, param, msg, handler); + } + + /** + * 智能连接比对被检设备(新增方法) + * 自动创建Handler并建立连接 + * + * @param ip 被检设备IP地址 + * @param port 被检设备端口 + * @param param 检测参数 + * @param msg 初始消息 + * 静态方法:智能连接程控源设备(兼容性包装) + */ + private void connectToContrast(String ip, Integer port, ContrastDetectionParam param, String msg) { + PreDetectionParam preDetectionParam = new PreDetectionParam(); + preDetectionParam.setUserPageId(param.getLoginName()); + NettyContrastClientHandler handler = new NettyContrastClientHandler(preDetectionParam, socketContrastResponseService); + executeSocketConnection(ip, port, preDetectionParam, msg, handler); + } + + /** + * 静态方法:智能连接程控源设备(兼容性包装) + */ + public static void connectToContrastDeviceStatic(String ip, Integer port, ContrastDetectionParam param, String msg) { + if (instance != null) { + instance.connectToContrast(ip, port, param, msg); + } else { + log.error("NettyClient未初始化,无法创建比对设备通讯连接"); + } + + } + + + /** + * 静态方法:智能连接程控源设备(兼容性包装) + */ + public static void connectToSourceStatic(String ip, Integer port, PreDetectionParam param, String msg) { + if (instance != null) { + instance.connectToSource(ip, port, param, msg); + } else { + log.error("NettyClient未初始化,无法创建程控源连接"); } } /** - * 重连方法 + * 静态方法:智能连接被检设备(兼容性包装) */ - public static void connect(Bootstrap bootstrap, String msg) { - try { - bootstrap.connect("127.0.0.1", 8787).sync() - .addListener((ChannelFutureListener) ch -> { - if (!ch.isSuccess()) { - ch.channel().close(); - final EventLoop loop = ch.channel().eventLoop(); - loop.schedule(() -> { - System.err.println("服务端链接不上,开始重连操作..."); - //重连 - connect(bootstrap, msg); - }, 3L, TimeUnit.SECONDS); - } else { - if (StrUtil.isNotBlank(msg)) { - ch.channel().writeAndFlush(msg); - } - System.out.println("服务端链接成功..."); - } - }); - } catch (Exception e) { - System.out.println(e.getMessage()); - try { - Thread.sleep(3000L); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - //再重连 - connect(bootstrap, msg); + public static void connectToDeviceStatic(String ip, Integer port, PreDetectionParam param, String msg) { + if (instance != null) { + instance.connectToDevice(ip, port, param, msg); + } else { + log.error("NettyClient未初始化,无法创建被检设备连接"); } } + /** + * 内部重构后的实现 - 拆分职责但不暴露给外部 + * 执行完整的Socket连接建立流程: + * 1. 创建事件循环组 + * 2. 配置Bootstrap启动器 + * 3. 设置管道处理链 + * 4. 建立连接并处理结果 + * + * @param ip 目标服务器IP地址 + * @param port 目标服务器端口号 + * @param param 检测参数对象 + * @param msg 连接成功后发送的初始消息 + * @param handler 业务处理器(区分程控源和被检设备) + */ + private static void executeSocketConnection(String ip, Integer port, + PreDetectionParam param, String msg, SimpleChannelInboundHandler handler) { + // 创建NIO事件循环组,用于处理网络I/O事件 + NioEventLoopGroup group = createEventLoopGroup(); + + try { + // 配置客户端启动器 + Bootstrap bootstrap = configureBootstrap(group); + // 创建管道初始化器,配置编解码和业务处理链 + ChannelInitializer initializer = createChannelInitializer(param, handler); + bootstrap.handler(initializer); + // 同步连接到目标服务器 + ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); + // 处理连接结果(成功或失败) + handleConnectionResult(channelFuture, param, handler, group, msg); + } catch (Exception e) { + // 处理连接过程中的异常 + handleConnectionException(e, param, handler, group); + } + } + + /** + * 创建NIO事件循环组 + * 用于管理网络I/O操作的线程池,处理连接、读写等异步事件 + * + * @return NioEventLoopGroup 事件循环组实例 + */ + private static NioEventLoopGroup createEventLoopGroup() { + return new NioEventLoopGroup(); + } + + /** + * 配置Bootstrap客户端启动器 + * 设置连接超时、通道类型等基础参数 + * + * @param group 事件循环组 + * @return Bootstrap 配置好的启动器 + */ + private static Bootstrap configureBootstrap(NioEventLoopGroup group) { + return new Bootstrap() + // 绑定事件循环组 + .group(group) + // 连接超时5秒 + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + // 使用NIO Socket通道 + .channel(NioSocketChannel.class); + } + + /** + * 创建通道初始化器 + * 当新连接建立时,初始化该连接的处理管道 + * + * @param param 检测参数,用于配置心跳处理器 + * @param handler 业务处理器 + * @return ChannelInitializer 通道初始化器 + */ + private static ChannelInitializer createChannelInitializer( + PreDetectionParam param, SimpleChannelInboundHandler handler) { + return new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel ch) { + setupPipeline(ch.pipeline(), param, handler); + } + }; + } + + /** + * 配置管道处理链 + * 按顺序添加各种处理器,构成完整的数据处理流水线: + * 1. LineBasedFrameDecoder:按行分割数据,解决TCP粘包拆包问题 + * 2. StringDecoder/StringEncoder:字符串编解码器 + * 3. HeartbeatHandler:心跳处理器,维持连接活跃 + * 4. IdleStateHandler:空闲检测器(仅被检设备需要) + * 5. 业务处理器:具体的业务逻辑处理 + * + * @param pipeline 管道对象 + * @param param 检测参数 + * @param handler 业务处理器 + */ + private static void setupPipeline(ChannelPipeline pipeline, + PreDetectionParam param, SimpleChannelInboundHandler handler) { + // 基础编解码器:处理数据格式转换和粘包拆包 + // 按行分割,最大10KB + pipeline.addLast(new LineBasedFrameDecoder(10240)) + // 字节转字符串 + .addLast(new StringDecoder(CharsetUtil.UTF_8)) + // 字符串转字节 + .addLast(new StringEncoder(CharsetUtil.UTF_8)); + + // 心跳处理器:根据设备类型选择不同的标签 + String tag = getDeviceTag(handler); + pipeline.addLast(new HeartbeatHandler(param, tag)); + // 空闲检测器:仅被检设备和比对被检设备需要,60秒无读操作触发空闲事件 + if (!isSourceHandler(handler)) { + pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); + } + + // 业务处理器:处理具体的检测业务逻辑 + pipeline.addLast(handler); + } + + /** + * 判断是否为程控源处理器 + * 程控源设备和被检设备使用不同的处理器和配置 + * + * @param handler 业务处理器 + * @return boolean true:程控源处理器, false:被检设备处理器 + */ + private static boolean isSourceHandler(SimpleChannelInboundHandler handler) { + return handler instanceof NettySourceClientHandler; + } + + /** + * 获取设备标签 + * 用于在SocketManager中区分不同类型的设备连接 + * + * @param handler 业务处理器 + * @return String 设备标签("_Source" 或 "_Dev") + */ + private static String getDeviceTag(SimpleChannelInboundHandler handler) { + String tag; + if (handler instanceof NettySourceClientHandler) { + tag = CnSocketUtil.SOURCE_TAG; + } else if (handler instanceof NettyDevClientHandler) { + tag = CnSocketUtil.DEV_TAG; + } else { + tag = CnSocketUtil.CONTRAST_DEV_TAG; + } + return tag; + } + + /** + * 获取名称 + * 用于在SocketManager中区分不同类型的设备连接 + * + * @param handler 业务处理器 + * @return String 设备标签("程控源设备" 或 "被检设备") + */ + private static String getDeviceType(SimpleChannelInboundHandler handler) { + String deviceType; + if (handler instanceof NettySourceClientHandler) { + deviceType = "程控源设备"; + } else if (handler instanceof NettyDevClientHandler) { + deviceType = "被检设备"; + } else { + deviceType = "比对被检设备"; + } + return deviceType; + } + + /** + * 处理连接结果 + * 为连接Future添加监听器,异步处理连接成功或失败的情况 + * + * @param channelFuture 连接Future对象 + * @param param 检测参数 + * @param handler 业务处理器 + * @param group 事件循环组 + * @param msg 初始消息 + */ + private static void handleConnectionResult(ChannelFuture channelFuture, + PreDetectionParam param, SimpleChannelInboundHandler handler, + NioEventLoopGroup group, String msg) { + channelFuture.addListener((ChannelFutureListener) ch -> { + if (!ch.isSuccess()) { + onConnectionFailure(handler, group); + } else { + onConnectionSuccess(channelFuture, param, handler, group, msg); + } + }); + } + + /** + * 连接失败处理 + * 输出失败信息并优雅关闭事件循环组 + * + * @param handler 业务处理器,用于区分设备类型 + * @param group 事件循环组 + */ + private static void onConnectionFailure(SimpleChannelInboundHandler handler, NioEventLoopGroup group) { + String deviceType = getDeviceType(handler); + log.info("连接{}服务端失败...", deviceType); + group.shutdownGracefully(); + } + + /** + * 连接成功处理 + * 执行连接成功后的初始化操作: + * 1. 管理Socket连接会话(注册EventLoopGroup到SocketManager) + * 2. 注册Channel到SocketManager,实现统一的连接管理 + * 3. 通过SocketManager发送初始消息,统一消息发送入口 + * + * @param channelFuture 连接Future对象 + * @param param 检测参数 + * @param handler 业务处理器 + * @param group 事件循环组 + * @param msg 初始消息 + */ + private static void onConnectionSuccess(ChannelFuture channelFuture, + PreDetectionParam param, SimpleChannelInboundHandler handler, + NioEventLoopGroup group, String msg) { + String deviceType = getDeviceType(handler); + log.info("连接{}服务端成功...", deviceType); + // 管理连接会话,将EventLoopGroup注册到SocketManager + manageSocketConnection(param, handler, group); + + // 将Channel也注册到SocketManager,便于统一消息发送 + String userId = param.getUserPageId() + getDeviceTag(handler); + SocketManager.addUser(userId, channelFuture.channel()); + + // 通过SocketManager发送初始消息,统一消息发送入口 + SocketManager.sendMsg(userId, msg); + } + + /** + * 管理Socket连接会话 + * 将新建立的连接注册到SocketManager中进行统一管理: + * 1. 检查并关闭同用户同设备类型的旧连接,避免资源泄露 + * 2. 将新连接注册到SocketManager,便于后续管理和查找 + *

+ * 连接Key格式:{userPageId}_{deviceTag} + * 例如:zhangsan_test_Source(程控源) / zhangsan_test_Dev(被检设备) + * + * @param param 检测参数,包含用户页面ID + * @param handler 业务处理器,用于区分设备类型 + * @param group 事件循环组,表示具体的连接资源 + */ + private static void manageSocketConnection(PreDetectionParam param, + SimpleChannelInboundHandler handler, NioEventLoopGroup group) { + // 构建连接标识:用户ID + 设备标签 + String key = param.getUserPageId() + getDeviceTag(handler); + + // 关闭旧连接:同一用户同一设备类型只能有一个活跃连接 + NioEventLoopGroup existingGroup = SocketManager.getGroupByUserId(key); + if (ObjectUtil.isNotNull(existingGroup)) { + try { + existingGroup.shutdownGracefully().sync(); + } catch (InterruptedException e) { + // 恢复中断状态 + Thread.currentThread().interrupt(); + } + } + + // 注册新连接到SocketManager + SocketManager.addGroup(key, group); + } + + /** + * 处理连接异常 + * 当连接建立过程中发生异常时的统一处理流程: + * 1. 关闭相关资源,防止资源泄露 + * 2. 执行设备相关的退出操作 + * 3. 通过WebSocket向前端通知错误信息 + * + * @param e 异常对象 + * @param param 检测参数 + * @param handler 业务处理器 + * @param group 事件循环组 + */ + private static void handleConnectionException(Exception e, PreDetectionParam param, + SimpleChannelInboundHandler handler, NioEventLoopGroup group) { + log.info("连接socket服务端发送异常: {}", e.getMessage()); + + // 关闭事件循环组资源 + group.shutdownGracefully(); + + // 执行设备相关的退出操作 + executeQuitOperations(param, handler); + + // 通过WebSocket通知前端页面 + notifyFrontendError(param, handler); + } + + /** + * 执行退出操作 + * 根据不同的设备处理器类型执行相应的退出指令: + * - NettyDevClientHandler:被检设备处理器,需要发送程控源退出指令 + * - 其他非程控源处理器:发送通用退出指令 + * - NettySourceClientHandler:程控源处理器,无需额外退出操作 + * + * @param param 检测参数 + * @param handler 业务处理器 + */ + private static void executeQuitOperations(PreDetectionParam param, + SimpleChannelInboundHandler handler) { + if (handler instanceof NettyDevClientHandler) { + // 被检设备异常时,发送程控源退出指令 + CnSocketUtil.quitSendSource(param); + } else if (handler instanceof NettyContrastClientHandler) { + CnSocketUtil.contrastSendquit(param.getUserPageId()); + } + // 程控源处理器异常时无需额外操作 + } + + /** + * 通知前端错误信息 + * 构建错误消息对象并通过WebSocket发送给前端页面 + * 前端可以根据操作码(Source/Dev)显示相应的错误提示 + * + * @param param 检测参数,包含用户页面ID + * @param handler 业务处理器,用于确定操作码 + */ + private static void notifyFrontendError(PreDetectionParam param, + SimpleChannelInboundHandler handler) { + // 构建错误消息对象 + SocketDataMsg socketDataMsg = new SocketDataMsg(); + // 消息类型 + socketDataMsg.setType("aaa"); + // 错误码 + socketDataMsg.setCode(SourceResponseCodeEnum.SOCKET_ERROR.getCode()); + // 错误消息 + socketDataMsg.setData(SourceResponseCodeEnum.SOCKET_ERROR.getMessage()); + // 请求ID标识 + socketDataMsg.setRequestId("connect"); + // 设置操作码:程控源为"Source",被检设备为"Dev" + String devTag = getDeviceTag(handler).substring(1); + socketDataMsg.setOperateCode(devTag); + // 通过WebSocket发送错误信息到前端页面 + WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); + } + } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index 212d63ac..87b9a55f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -21,227 +21,324 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.TimeoutException; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.ConnectException; import java.net.ProtocolException; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** - * @Description: 源客户端业务处理(示例) + * Netty设备客户端处理器 + *

负责处理与被检测设备的Socket通信,包括:

+ *
    + *
  • 通道生命周期管理(建立、断开)
  • + *
  • 消息接收和处理
  • + *
  • 心跳超时处理
  • + *
  • 异常处理和恢复
  • + *
+ * + * @Description: 设备客户端业务处理器 * @Author: wr * @Date: 2024/12/10 14:16 */ +@Slf4j @RequiredArgsConstructor public class NettyDevClientHandler extends SimpleChannelInboundHandler { + /** 闪变检测超时时间:20分钟(1300秒) */ + private static final long FLICKER_TIMEOUT = 1300L; + + /** 统计数据检测超时时间:3分钟(180秒) */ + private static final long STATISTICS_TIMEOUT = 180L; + + /** 实时数据检测超时时间:1分钟(60秒) */ + private static final long REALTIME_TIMEOUT = 60L; + + /** 暂停操作超时时间:10分钟(600秒) */ + private static final long STOP_TIMEOUT = 600L; + + /** 超时时默认结果标志:3表示超时失败 */ + private static final int DEFAULT_RESULT_FLAG = 3; + private final PreDetectionParam param; private final SocketDevResponseService socketResponseService; /** - * 当通道进行连接时推送消息 + * 当通道连接建立时的处理逻辑 + *

将关闭原有连接,并将新连接注册到SocketManager中

* - * @param ctx + * @param ctx 通道上下文 + * @throws Exception 连接异常 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("客户端通道已建立" + ctx.channel().id()); + log.info("客户端通道已建立: {}", ctx.channel().id()); + + // 检查是否存在同一用户的老连接 Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + CnSocketUtil.DEV_TAG); if (Objects.nonNull(channel)) { try { + // 关闭老连接避免连接泄漏 channel.close().sync(); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("关闭通道异常", e); } } + // 注册新的连接到用户管理器 SocketManager.addUser(param.getUserPageId() + CnSocketUtil.DEV_TAG, ctx.channel()); } /** - * 处理服务端消息消息信息 - */ - @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { - System.out.println("devhandler接收server端数据>>>>>>" + msg); - try { - socketResponseService.deal(param, msg); - } catch (Exception e) { - e.printStackTrace(); - CnSocketUtil.quitSend(param); - } - } - - /** - * 当通道断线时,支持重连 + * 当通道断开时的清理工作 + *

关闭连接,清理用户映射,退出源设备发送

* - * @param ctx + * @param ctx 通道上下文 + * @throws Exception 关闭异常 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - System.out.println("设备通讯客户端断线"); + log.warn("设备通讯客户端断线"); ctx.close(); SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG); CnSocketUtil.quitSendSource(param); } /** - * 用户事件的回调方法(自定义事件用于心跳机制) + * 处理从服务端接收到的消息 + *

将消息交给SocketDevResponseService进行具体处理

* - * @param ctx - * @param evt + * @param ctx 通道上下文 + * @param msg 接收到的消息 + * @throws InterruptedException 线程中断异常 + */ + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { + log.info("devhandler接收server端数据: {}", msg); + try { + socketResponseService.deal(param, msg); + } catch (Exception e) { + log.error("处理服务端消息异常", e); + CnSocketUtil.quitSend(param); + } + } + + + + /** + * 用户事件回调方法,主要用于处理心跳超时 + *

当触发READER_IDLE事件时,根据当前状态进行超时处理

+ * + * @param ctx 通道上下文 + * @param evt 用户事件对象 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { - Boolean fly = false; - if (evt instanceof IdleStateEvent) { //IdleState.在一段时间内没有收到任何消息时,会触发该事件 - if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) { - System.out.println(LocalDateTime.now() + "devHandler触发读超时函数**************************************"); - if (!FormalTestManager.hasStopFlag) { - if (CollUtil.isNotEmpty(SocketManager.getSourceList())) { - SourceIssue sourceIssue = SocketManager.getSourceList().get(0); - if (SocketManager.clockMap.containsKey(sourceIssue.getIndex())) { - SocketManager.clockMap.put(sourceIssue.getIndex(), SocketManager.clockMap.get(sourceIssue.getIndex()) + 60L); - } else { - SocketManager.clockMap.put(sourceIssue.getIndex(), 60L); - } - - if (sourceIssue.getType().equals(DicDataEnum.F.getCode())) { - //闪变,正常抛一轮最大等待20分钟超时 - if (SocketManager.clockMap.get(sourceIssue.getIndex()) > 1300) { - fly = true; - System.out.println("超时处理-----》" + sourceIssue.getType() + "已超时----------------关闭"); - CnSocketUtil.quitSend(param); - timeoutSend(sourceIssue); - } - } else if (sourceIssue.getType().equals(DicDataEnum.VOLTAGE.getCode()) || sourceIssue.getType().equals(DicDataEnum.HP.getCode())) { - //统计数据项,正常抛一轮数据,超时 - if (SocketManager.clockMap.get(sourceIssue.getIndex()) > 180) { - fly = true; - CnSocketUtil.quitSend(param); - System.out.println("超时处理-----》" + sourceIssue.getType() + "已超时----------------关闭"); - timeoutSend(sourceIssue); - } - } else { - //实时数据 - if (SocketManager.clockMap.get(sourceIssue.getIndex()) > 60) { - fly = true; - CnSocketUtil.quitSend(param); - System.out.println("超时处理-----》" + sourceIssue.getType() + "已超时----------------关闭"); - timeoutSend(sourceIssue); - } - } - } else { - fly = true; - //为空则认为是常规步骤,设定一分钟超时 - CnSocketUtil.quitSend(param); - WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.SOCKET_TIMEOUT); - } - if (fly) { - socketResponseService.backCheckState(param); - } - } else { - //如果是暂停操作后 - FormalTestManager.stopTime += 60; - System.out.println("当前进入暂停操作超时函数-----------------" + FormalTestManager.stopTime); - if (FormalTestManager.stopTime > 600) { - CnSocketUtil.quitSend(param); - WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.STOP_TIMEOUT); - } - } + // 检查是否为读取空闲事件(心跳超时) + if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) { + log.warn("devHandler触发读超时函数: {}", LocalDateTime.now()); + // 根据是否有停止标志采取不同的超时处理策略 + if (!FormalTestManager.hasStopFlag) { + // 正常检测中的超时处理 + handleReadTimeout(); + } else { + // 暂停状态下的超时处理 + handleStopTimeout(); } } - - } + /** + * 处理器被添加到管道时的回调 + * + * @param ctx 通道上下文 + */ @Override public void handlerAdded(ChannelHandlerContext ctx) { - System.out.println("有通道准备接入" + ctx.channel()); + log.info("有通道准备接入: {}", ctx.channel()); } + /** + * 异常捕获处理 + *

捕获并处理各种类型的异常,执行清理工作

+ * + * @param ctx 通道上下文 + * @param cause 异常原因 + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - System.out.println("捕获到设备服务异常。。。。。。。"); - // 处理异常,例如记录日志、关闭连接等 - cause.printStackTrace(); - // 根据异常类型进行不同的处理 - if (cause instanceof ConnectException) { - // 处理连接异常,例如重试连接或记录特定的连接错误信息 - System.out.println("连接socket服务端异常"); - - } else if (cause instanceof IOException) { - // 处理I/O异常,例如读写错误 - System.out.println("IOException caught: There was an I/O error."); - WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR); - - // 例如,可以记录更详细的I/O错误信息 - } else if (cause instanceof TimeoutException) { - // 处理超时异常 - System.out.println("TimeoutException caught: Operation timed out."); - // 可以根据业务逻辑决定是否重试或记录超时信息 - } else if (cause instanceof ProtocolException) { - // 处理协议异常,例如消息格式不正确 - System.out.println("ProtocolException caught: Invalid protocol message."); - // 可以记录协议错误信息或向客户端发送错误响应 - } else { - // 处理其他类型的异常 - System.out.println("Unknown exception caught: " + cause.getMessage()); - WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR); - // 可以记录未知异常信息 - } + log.error("捕获到设备服务异常", cause); + handleSpecificException(cause); + // 统一清理工作 CnSocketUtil.quitSend(param); CnSocketUtil.quitSendSource(param); socketResponseService.backCheckState(param); ctx.close(); } - /** - * 发送业务消息时候开启计时器, - * @param requestId + * 处理特定类型的异常 + *

根据异常类型进行相应的日志记录和错误通知

+ * + * @param cause 异常对象 */ -/* private void scheduleTimeoutTask(String requestId) { - ScheduledFuture future = scheduler.schedule(() -> { - if (requestTimeoutTasks.containsKey(requestId)) { - // 处理超时逻辑 - System.out.println("Business request with ID " + requestId + " timed out."); - requestTimeoutTasks.remove(requestId); - ctx.close(); - } - }, BUSINESS_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - requestTimeoutTasks.put(requestId, future); - }*/ + private void handleSpecificException(Throwable cause) { + if (cause instanceof ConnectException) { + log.error("连接socket服务端异常"); + } else if (cause instanceof IOException) { + log.error("IO异常: {}", cause.getMessage()); + WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR); + } else if (cause instanceof TimeoutException) { + log.error("操作超时: {}", cause.getMessage()); + } else if (cause instanceof ProtocolException) { + log.error("协议异常: {}", cause.getMessage()); + } else { + log.error("未知异常: {}", cause.getMessage()); + WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR); + } + } + + /** - * 超时后的处理 + * 处理读取超时事件 + *

检查源列表,更新超时计数器,判断是否超时并处理

+ */ + private void handleReadTimeout() { + if (CollUtil.isNotEmpty(SocketManager.getSourceList())) { + // 获取当前正在检测的源问题(取第一个) + SourceIssue sourceIssue = SocketManager.getSourceList().get(0); + // 更新该源问题的超时计数器 + updateTimeoutCounter(sourceIssue); + // 根据检测类型判断是否已超时 + if (isTimeout(sourceIssue)) { + handleTimeout(sourceIssue); + } + } else { + // 源列表为空,认为是常规步骤的超时,默认一分钟超时 + CnSocketUtil.quitSend(param); + WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.SOCKET_TIMEOUT); + socketResponseService.backCheckState(param); + } + } + + /** + * 处理暂停操作的超时事件 + *

当检测被暂停时,统计暂停时间,超过限制后发送超时通知

+ */ + private void handleStopTimeout() { + FormalTestManager.stopTime += 60; + log.warn("当前进入暂停操作超时函数,停止时间: {}", FormalTestManager.stopTime); + if (FormalTestManager.stopTime > STOP_TIMEOUT) { + CnSocketUtil.quitSend(param); + WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.STOP_TIMEOUT); + } + } + + /** + * 更新指定源问题的超时计数器 + *

每次调用时增加60秒,用于统计累计超时时间

+ * + * @param sourceIssue 源问题对象 + */ + private void updateTimeoutCounter(SourceIssue sourceIssue) { + Integer index = sourceIssue.getIndex(); + if (SocketManager.clockMap.containsKey(index)) { + SocketManager.clockMap.put(index, SocketManager.clockMap.get(index) + 60L); + } else { + SocketManager.clockMap.put(index, 60L); + } + } + + /** + * 根据检测类型判断是否已超时 + *

不同检测类型有不同的超时阈值:

+ *
    + *
  • 闪变检测:20分钟
  • + *
  • 统计数据:3分钟
  • + *
  • 实时数据:1分钟
  • + *
+ * + * @param sourceIssue 源问题对象 + * @return true 如果已超时,false 否则 + */ + private boolean isTimeout(SourceIssue sourceIssue) { + long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex()); + String type = sourceIssue.getType(); + + // 根据不同检测类型使用不同的超时阈值 + if (DicDataEnum.F.getCode().equals(type)) { + // 闪变检测:需要更长时间,20分钟超时 + return currentTime > FLICKER_TIMEOUT; + } else if (DicDataEnum.VOLTAGE.getCode().equals(type) || DicDataEnum.HP.getCode().equals(type)) { + // 统计数据类型(电压、谐波):中等时间,3分钟超时 + return currentTime > STATISTICS_TIMEOUT; + } else { + // 实时数据类型:短时间,1分钟超时 + return currentTime > REALTIME_TIMEOUT; + } + } + + /** + * 执行超时处理操作 + *

记录超时日志,退出发送,发送超时结果,恢复检测状态

+ * + * @param sourceIssue 源问题对象 + */ + private void handleTimeout(SourceIssue sourceIssue) { + log.warn("超时处理 - {} 已超时,关闭连接", sourceIssue.getType()); + CnSocketUtil.quitSend(param); + timeoutSend(sourceIssue); + socketResponseService.backCheckState(param); + } + + /** + * 发送超时结果 + *

为所有设备创建超时的检测结果,并通过WebSocket发送给客户端

+ * + * @param sourceIssue 源问题对象 */ private void timeoutSend(SourceIssue sourceIssue) { - List devListRes = new ArrayList<>(); - FormalTestManager.devList.forEach(dev -> { - DevLineTestResult devLineTestResult = new DevLineTestResult(); - devLineTestResult.setDeviceId(dev.getDevId()); - devLineTestResult.setDeviceName(dev.getDevName()); - List resultFlagList = new ArrayList<>(); - List monitorListDTOList = dev.getMonitorList(); - monitorListDTOList.forEach(i -> resultFlagList.add(3)); - devLineTestResult.setChnResult(resultFlagList.toArray(new Integer[monitorListDTOList.size()])); - devListRes.add(devLineTestResult); - }); + List devListRes = FormalTestManager.devList.stream() + .map(this::createTimeoutResult) + .collect(Collectors.toList()); + WebSocketVO> socketVO = new WebSocketVO<>(); socketVO.setRequestId(sourceIssue.getType() + CnSocketUtil.END_TAG); socketVO.setOperateCode(sourceIssue.getType()); socketVO.setData(devListRes); + WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketVO)); } + /** + * 为指定设备创建超时的检测结果 + *

将所有监测点的结果设置为超时标志值

+ * + * @param dev 设备对象 + * @return 设备检测结果 + */ + private DevLineTestResult createTimeoutResult(PreDetection dev) { + DevLineTestResult devLineTestResult = new DevLineTestResult(); + devLineTestResult.setDeviceId(dev.getDevId()); + devLineTestResult.setDeviceName(dev.getDevName()); + + Integer[] resultFlags = dev.getMonitorList().stream() + .map(monitor -> DEFAULT_RESULT_FLAG) + .toArray(Integer[]::new); + devLineTestResult.setChnResult(resultFlags); + + return devLineTestResult; + } + } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java index 09602c4f..75c88a0c 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java @@ -8,132 +8,133 @@ import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.TimeoutException; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.ConnectException; import java.net.ProtocolException; +import cn.hutool.core.util.StrUtil; + /** - * @Description: 源客户端业务处理(示例) - * @Author: wr - * @Date: 2024/12/10 14:16 + * 源设备Netty客户端通道处理器 + * 负责处理程控源设备的Socket通信 + * + * @author wr + * @since 2024/12/10 */ +@Slf4j @RequiredArgsConstructor public class NettySourceClientHandler extends SimpleChannelInboundHandler { + /** 检测参数对象,包含用户页面ID等信息 */ private final PreDetectionParam webUser; + /** 源设备响应处理服务 */ private final SocketSourceResponseService sourceResponseService; /** - * 当通道进行连接时推送消息 - * - * @param ctx + * 通道激活回调,将通道注册到SocketManager */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("客户端通道已建立" + ctx.channel().id()); - - SocketManager.addUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG, ctx.channel()); - } - - /** - * 处理服务端消息信息 - */ - @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { - System.out.println("source接收server端数据>>>>>>" + msg); - try { - sourceResponseService.deal(webUser, msg); - } catch (Exception e) { - e.printStackTrace(); - CnSocketUtil.quitSend(webUser); + // 验证webUser参数有效性 + if (webUser == null) { + log.warn("源设备客户端通道已建立但webUser为空, channelId: {}", ctx.channel().id()); + return; + } + + String userId = webUser.getUserPageId(); + log.info("源设备客户端通道已建立, channelId: {}, userId: {}", ctx.channel().id(), userId); + + // 将通道注册到Socket管理器,便于后续消息推送 + if (StrUtil.isNotBlank(userId)) { + SocketManager.addUser(userId + CnSocketUtil.SOURCE_TAG, ctx.channel()); + } else { + log.warn("源设备userId为空或空白,跳过通道注册, channelId: {}", ctx.channel().id()); } - - } /** - * 当通道断线时,支持重连 - * - * @param ctx + * 通道断开回调,清理资源 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - System.out.println("源通讯客户端断线"); + String userId = webUser != null ? webUser.getUserPageId() : "unknown"; + log.warn("源通讯客户端断线, channelId: {}, userId: {}", ctx.channel().id(), userId); + // 关闭通道连接 ctx.close(); - SocketManager.removeUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG); -// System.out.println("断线了......" + ctx.channel()); -// ctx.channel().eventLoop().schedule(() -> { -// System.out.println("断线重连......"); -// //重连 -// NettyClient.connect(); -// }, 3L, TimeUnit.SECONDS); - } - - /** - * 用户事件的回调方法(自定义事件用于心跳机制) - * - * @param ctx - * @param evt - */ - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { - //如果是空闲状态事件 - if (evt instanceof IdleStateEvent) { - if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) { - //发送ping 保持心跳链接 - /* SocketMsg msg = new SocketMsg<>(); - msg.setRequestId("yxt"); - msg.setOperateCode(SourceOperateCodeEnum.HEARTBEAT.getValue()); - msg.setData(""); - ctx.writeAndFlush(JSON.toJSONString(msg)+"\n");*/ - } - } else { - //防止堆栈溢出 - //userEventTriggered(ctx, evt); + // 从Socket管理器中移除用户通道映射 + if (webUser != null && StrUtil.isNotBlank(userId)) { + SocketManager.removeUser(userId + CnSocketUtil.SOURCE_TAG); } } + + /** + * 处理源设备响应消息 + */ + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { + // 验证用户参数 + if (webUser == null) { + log.warn("源设备消息处理失败: webUser为空, message: {}", msg); + return; + } + + String userId = webUser.getUserPageId(); + log.debug("源设备接收服务端数据, userId: {}, message: {}", userId, msg); + + try { + // 委托给专门的响应处理服务处理业务逻辑 + sourceResponseService.deal(webUser, msg); + } catch (Exception e) { + log.error("源设备消息处理异常, userId: {}, message: {}", userId, msg, e); + // 发生异常时退出发送,避免后续问题 + CnSocketUtil.quitSend(webUser); + } + } + + @Override public void handlerAdded(ChannelHandlerContext ctx) { - System.out.println("有通道准备接入" + ctx.channel().id()); + // 记录处理器添加事件,用于调试 + String userId = webUser != null ? webUser.getUserPageId() : "unknown"; + log.debug("源设备通道准备接入, channelId: {}, userId: {}", ctx.channel().id(), userId); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // 处理异常,例如记录日志、关闭连接等 - System.out.println("捕获到源异常。。。。。。。"); - cause.printStackTrace(); - // 根据异常类型进行不同的处理 + String userId = webUser != null ? webUser.getUserPageId() : "unknown"; + String channelId = ctx.channel().id().toString(); + + // 根据异常类型进行分类处理和日志记录 if (cause instanceof ConnectException) { - // 处理连接异常,例如重试连接或记录特定的连接错误信息 - System.out.println("连接socket服务端异常"); - + // 连接异常:网络连接失败 + log.error("连接源设备Socket服务端异常, channelId: {}, userId: {}", channelId, userId, cause); } else if (cause instanceof IOException) { - // 处理I/O异常,例如读写错误 - WebServiceManager.sendDetectionErrorMessage(webUser.getUserPageId(), SourceOperateCodeEnum.SERVER_ERROR); - - // 例如,可以记录更详细的I/O错误信息 + // IO异常:数据传输错误,需通知前端 + log.error("源设备IO异常, channelId: {}, userId: {}", channelId, userId, cause); + // 向前端发送服务器错误消息 + if (StrUtil.isNotBlank(userId) && !"unknown".equals(userId)) { + WebServiceManager.sendDetectionErrorMessage(userId, SourceOperateCodeEnum.SERVER_ERROR); + } } else if (cause instanceof TimeoutException) { - // 处理超时异常 - System.out.println("TimeoutException caught: Operation timed out."); - // 可以根据业务逻辑决定是否重试或记录超时信息 + // 超时异常:通信响应超时 + log.warn("源设备通信超时, channelId: {}, userId: {}", channelId, userId, cause); } else if (cause instanceof ProtocolException) { - // 处理协议异常,例如消息格式不正确 - System.out.println("ProtocolException caught: Invalid protocol message."); - // 可以记录协议错误信息或向客户端发送错误响应 + // 协议异常:数据格式不符合协议规范 + log.error("源设备协议异常, channelId: {}, userId: {}", channelId, userId, cause); } else { - // 处理其他类型的异常 - System.out.println("Unknown exception caught: " + cause.getMessage()); - // 可以记录未知异常信息 + // 其他未知异常 + log.error("源设备未知异常, channelId: {}, userId: {}, message: {}", channelId, userId, cause.getMessage(), cause); } + + // 发生异常时关闭通道 ctx.close(); } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java index 7e2bf7b4..a8133848 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java @@ -159,21 +159,6 @@ public class WebServiceManager { preDetectionParamMap.put(userId, preDetectionParam); } - /** - * 存储检测参数(兼容老版本) - * 从检测参数对象中获取userPageId作为key - * - * @param preDetectionParam 检测参数对象,必须包含userPageId - * @throws IllegalArgumentException 当userPageId为空时抛出 - * @deprecated 建议使用 {@link #addPreDetectionParam(String, PreDetectionParam)} - */ - @Deprecated - public static void addPreDetectionParam(PreDetectionParam preDetectionParam) { - if (preDetectionParam == null || preDetectionParam.getUserPageId() == null) { - throw new IllegalArgumentException("检测参数或用户ID不能为空"); - } - preDetectionParamMap.put(preDetectionParam.getUserPageId(), preDetectionParam); - } /** * 获取指定用户的检测参数 diff --git a/detection/src/main/java/com/njcn/gather/report/service/impl/PqReportServiceImpl.java b/detection/src/main/java/com/njcn/gather/report/service/impl/PqReportServiceImpl.java index 94f8ff19..6ab6dd82 100644 --- a/detection/src/main/java/com/njcn/gather/report/service/impl/PqReportServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/report/service/impl/PqReportServiceImpl.java @@ -129,11 +129,6 @@ public class PqReportServiceImpl extends ServiceImpl i @Value("${report.reportDir:D:\\report}") private String reportPath; - @Value("${socket.device.ip}") - private String ip; - - @Value("${socket.device.port}") - private Integer port; @Value("${qr.cloud}") private String cloudUrl; @@ -184,7 +179,7 @@ public class PqReportServiceImpl extends ServiceImpl i private final ISysTestConfigService sysTestConfigService; private final SocketDevResponseService socketDevResponseService; - + private final SocketManager socketManager; @Autowired private RestTemplateUtil restTemplateUtil; @@ -705,8 +700,8 @@ public class PqReportServiceImpl extends ServiceImpl i PreDetectionParam preDetectionParam = new PreDetectionParam(); preDetectionParam.setUserPageId(RequestUtil.getLoginName()); preDetectionParam.setSendWebMsg(false); - - NettyClient.socketClient(ip, port, preDetectionParam, msg, new NettyDevClientHandler(preDetectionParam, socketDevResponseService)); + // 使用智能发送工具类,自动管理设备连接 + socketManager.smartSendToDevice(preDetectionParam, msg); } else { channel.writeAndFlush(msg + "\n"); } diff --git a/detection/src/main/java/com/njcn/gather/script/service/impl/PqScriptDtlsServiceImpl.java b/detection/src/main/java/com/njcn/gather/script/service/impl/PqScriptDtlsServiceImpl.java index 321f076f..9c942b46 100644 --- a/detection/src/main/java/com/njcn/gather/script/service/impl/PqScriptDtlsServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/script/service/impl/PqScriptDtlsServiceImpl.java @@ -720,21 +720,17 @@ public class PqScriptDtlsServiceImpl extends ServiceImpl getScriptToIcdCheckInfo(PreDetectionParam param) { - PqScriptIssueParam issueParam = new PqScriptIssueParam(); -// issueParam.setPlanId(param.getPlanId()); issueParam.setSourceId(param.getSourceId()); issueParam.setDevIds(param.getDevIds()); issueParam.setScriptId(param.getScriptId()); issueParam.setIsPhaseSequence(CommonEnum.FORMAL_TEST.getValue()); List sourceIssues = this.listSourceIssue(issueParam); - Set dataTypeSet = new HashSet<>(); sourceIssues.forEach(x -> { dataTypeSet.addAll(x.getDevValueTypeList()); }); - - return dataTypeSet.stream().collect(Collectors.toList()); + return new ArrayList<>(dataTypeSet); } @Override