netty优化

This commit is contained in:
2025-08-11 09:39:44 +08:00
parent c16c1f8e1d
commit df83f65328
13 changed files with 1221 additions and 544 deletions

View File

@@ -126,7 +126,7 @@ public class SocketDevResponseService {
switch (Objects.requireNonNull(sourceOperateCodeEnum)) {
//设备通讯校验
case YJC_SBTXJY:
devComm(socketDataMsg, param, msg);
devComm(socketDataMsg, param);
break;
//协议校验
case YJC_XYJY:
@@ -165,6 +165,9 @@ public class SocketDevResponseService {
break;
case YXT:
break;
default:
// todo... 要日志记录或者websocket送到前端友好提示用户
break;
}
}
@@ -700,7 +703,7 @@ public class SocketDevResponseService {
/**
* 装置通讯检测
*/
private void devComm(SocketDataMsg socketDataMsg, PreDetectionParam param, String msg) {
private void devComm(SocketDataMsg socketDataMsg, PreDetectionParam param) {
SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
String s = param.getUserPageId() + CnSocketUtil.DEV_TAG;
SocketMsg<String> socketMsg = new SocketMsg<>();
@@ -710,7 +713,6 @@ public class SocketDevResponseService {
successComm.add(result);
//单个测点通讯成功
WebServiceManager.sendMsg(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, FormalTestManager.devNameMapComm, 1));
System.out.println("设备通讯校验!" + successComm.size() + "=====" + FormalTestManager.monitorIdListComm.size());
if (successComm.size() == FormalTestManager.monitorIdListComm.size()) {
// 通知前端整个装置通讯检测过程成功
@@ -752,11 +754,9 @@ public class SocketDevResponseService {
case RE_OPERATE:
//出现已经初始化情况,发送用户用户确认是否继续检测
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
CnSocketUtil.quitSend(param);
break;
case NO_INIT_DEV:
//发起关闭操作
CnSocketUtil.quitSend(param);
break;
default:
WebServiceManager.sendUnknownErrorMessage(param.getUserPageId());
@@ -791,7 +791,6 @@ public class SocketDevResponseService {
String s = param.getUserPageId() + CnSocketUtil.DEV_TAG;
switch (Objects.requireNonNull(dictDataEnumByCode)) {
case SUCCESS:
if (socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.DEV_INIT_GATHER_02.getValue())) {
successComm.add(socketDataMsg.getData());
WebServiceManager.sendMsg(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, FormalTestManager.devNameMapComm, 1));
@@ -839,12 +838,10 @@ public class SocketDevResponseService {
System.out.println("开始脚本与icd校验:++++++++++");
SocketManager.sendMsg(s, JSON.toJSONString(socketMsg));
}
completeJudgment(param);
} else if (socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.VERIFY_MAPPING$01.getValue())) {
String data = socketDataMsg.getData();
IcdCheckData icdCheckData = JSON.parseObject(data, IcdCheckData.class);
boolean isContinue = true;
for (int i = 0; i < icdCheckData.getResultData().size(); i++) {
IcdCheckData.ResultData item = icdCheckData.getResultData().get(i);
@@ -941,19 +938,6 @@ public class SocketDevResponseService {
webSocketVO.setDesc(null);
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(webSocketVO));
}
// if (SourceOperateCodeEnum.TEST_TEM_START.getValue().equals(param.getOperateType())) {
// //暂停检测后的继续检测
// System.out.println("进入暂停后的继续检测》》》》》》》》》》》》》》》》》》》》》》》》》》》" + "剩余检测小项" + SocketManager.getSourceList().size());
// if (CollUtil.isNotEmpty(SocketManager.getSourceList())) {
// SourceIssue sourceIssue = SocketManager.getSourceList().get(0);
// socketMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue() + CnSocketUtil.STEP_TAG + sourceIssue.getType());
// socketMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue());
// socketMsg.setData(JSON.toJSONString(sourceIssue));
// SocketManager.sendMsg(param.getUserPageId() + CnSocketUtil.SOURCE_TAG, JSON.toJSONString(socketMsg));
// }
// } else {
//
// }
} else {
// 发送下一个脚本与icd校验
String icdType = icdTypeList.stream().filter(it -> !icdCheckDataMap.containsKey(it)).findFirst().orElse(null);
@@ -995,6 +979,7 @@ public class SocketDevResponseService {
break;
default:
WebServiceManager.sendUnknownErrorMessage(param.getUserPageId());
// todo... 这种情况是报文的状态码不一致,需要记录到日志表,以便问题追踪
break;
}
}
@@ -1020,11 +1005,13 @@ public class SocketDevResponseService {
}
} else {
if (ObjectUtil.isNotNull(list.getA()) && list.getA().equals(1.0) && ObjectUtil.isNotNull(list.getB()) && list.getB().equals(1.0) && ObjectUtil.isNotNull(list.getC()) && list.getC().equals(1.0) || ObjectUtil.isNotNull(list.getT()) && list.getT().equals(1.0)) {
return 1; // 装置上送错误
// 装置上送错误
return 1;
}
}
}
return 0; // icd文件与脚本不匹配
// icd文件与脚本不匹配
return 0;
}

View File

@@ -35,12 +35,8 @@ public class SocketSourceResponseService {
*/
private final SocketDevResponseService socketDevResponseService;
private final IPqDevService iPqDevService;
private final SocketManager socketManager;
@Value("${socket.device.ip}")
private String ip;
@Value("${socket.device.port}")
private Integer port;
private List<PreDetection> devList = new ArrayList<>();
private List<String> monitorIdList = new ArrayList<>();
@@ -82,8 +78,12 @@ public class SocketSourceResponseService {
break;
case YXT:
break;
default:
// todo... 要日志记录或者websocket送到前端友好提示用户
break;
}
} else {
// todo... 要日志记录或者websocket送到前端友好提示用户
System.out.println("fggggggggggggggggggggg" + enumByCode);
}
}
@@ -219,14 +219,11 @@ public class SocketSourceResponseService {
SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
if (ObjectUtil.isNotNull(dictDataEnumByCode)) {
SocketMsg<String> socketMsg = new SocketMsg<>();
switch (dictDataEnumByCode) {
case SUCCESS:
//todo 前端推送收到的消息暂未处理好
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
String s = param.getUserPageId() + CnSocketUtil.DEV_TAG;
//开始设备通讯检测(发送设备初始化)
//List<PreDetection> devList = iPqDevService.getDevInfo(param.getDevIds());
Map<String, List<PreDetection>> map = new HashMap<>(1);
map.put("deviceList", FormalTestManager.devList);
String jsonString = JSON.toJSONString(map);
@@ -234,8 +231,8 @@ public class SocketSourceResponseService {
socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue());
socketMsg.setData(jsonString);
String json = JSON.toJSONString(socketMsg);
// SocketManager.sendMsg(s,json);
NettyClient.socketClient(ip, port, param, json, new NettyDevClientHandler(param, socketDevResponseService));
// 使用智能发送工具类,自动管理设备连接
socketManager.smartSendToDevice(param, json);
break;
case UNPROCESSED_BUSINESS:
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
@@ -273,9 +270,12 @@ public class SocketSourceResponseService {
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
break;
default:
// todo... 这种情况是报文的状态码不一致,需要记录到日志表,以便问题追踪
WebServiceManager.sendUnknownErrorMessage(param.getUserPageId());
break;
}
}else{
// todo... 这种情况是报文的状态码不一致,需要记录到日志表,以便问题追踪
}
}

View File

@@ -21,10 +21,8 @@ import com.njcn.gather.detection.util.business.DetectionCommunicateUtil;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
import com.njcn.gather.detection.util.socket.FormalTestManager;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettyContrastClientHandler;
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import com.njcn.web.utils.RequestUtil;
import com.njcn.gather.device.pojo.po.PqDev;
import com.njcn.gather.device.pojo.vo.PreDetection;
import com.njcn.gather.device.service.IPqDevService;
@@ -41,11 +39,9 @@ import com.njcn.gather.source.pojo.po.SourceInitialize;
import com.njcn.gather.source.service.IPqSourceService;
import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum;
import com.njcn.gather.system.dictionary.service.IDictDataService;
import com.njcn.web.utils.RequestUtil;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@@ -72,14 +68,7 @@ public class PreDetectionServiceImpl implements PreDetectionService {
private final SocketSourceResponseService socketSourceResponseService;
private final SocketContrastResponseService socketContrastResponseService;
private final IPqScriptCheckDataService iPqScriptCheckDataService;
@Value("${socket.device.ip:127.0.0.1}")
private String ip;
@Value("${socket.device.port:61000}")
private Integer port;
//private final SocketSourceResponseService sourceResponseService;
private final SocketManager socketManager;
@Override
@@ -119,6 +108,10 @@ public class PreDetectionServiceImpl implements PreDetectionService {
}
/**
* 原本系数校准单独写的,现在合并了,该方法过期了,没有调用了
*/
@Deprecated
@Override
public void coefficientCheck(PreDetectionParam param) {
// 检测是否存在连接的通道,后期需要做成动态,如果组合中不是第一位,则不需要关闭,也不用初始化 todo....
@@ -137,8 +130,8 @@ public class PreDetectionServiceImpl implements PreDetectionService {
msg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue());
msg.setData(JSON.toJSONString(sourceParam));
param.setSourceId(sourceParam.getSourceId());
// NettyClient.socketClient(ip, port, param, JSON.toJSONString(msg), new NettySourceClientHandler(param, sourceResponseService));
NettyClient.socketClient(ip, port, param, JSON.toJSONString(msg), new NettySourceClientHandler(param, socketSourceResponseService));
// 使用智能发送工具类,自动管理连接
socketManager.smartSendToSource(param, JSON.toJSONString(msg));
} else {
throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT);
}
@@ -182,8 +175,8 @@ public class PreDetectionServiceImpl implements PreDetectionService {
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue());
socketMsg.setData(JSON.toJSONString(sourceParam));
//建立与源控程序的socket连接
NettyClient.socketClient(ip, port, param, JSON.toJSONString(socketMsg), new NettySourceClientHandler(param, socketSourceResponseService));
//使用智能发送工具类,自动管理与源控程序的socket连接
socketManager.smartSendToSource(param, JSON.toJSONString(socketMsg));
} else {
throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT);
}
@@ -218,7 +211,8 @@ public class PreDetectionServiceImpl implements PreDetectionService {
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue());
socketMsg.setData(JSON.toJSONString(sourceParam));
NettyClient.socketClient(ip, port, param, JSON.toJSONString(socketMsg), new NettySourceClientHandler(param, socketSourceResponseService));
// 使用智能发送工具类,自动管理连接
socketManager.smartSendToSource(param, JSON.toJSONString(socketMsg));
} else {
throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT);
}
@@ -353,17 +347,14 @@ public class PreDetectionServiceImpl implements PreDetectionService {
});
map.put("deviceList", preDetections);
String jsonString = JSON.toJSONString(map);
SocketMsg<String> socketMsg = new SocketMsg<>();
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_SBTXJY.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_02.getValue());
socketMsg.setData(jsonString);
String json = JSON.toJSONString(socketMsg);
PreDetectionParam preDetectionParam = new PreDetectionParam();
preDetectionParam.setUserPageId(param.getLoginName());
WebServiceManager.addPreDetectionParam(param.getLoginName(), preDetectionParam);
NettyClient.socketClient(ip, port, preDetectionParam, json, new NettyContrastClientHandler(preDetectionParam, socketContrastResponseService));
socketManager.smartSendToContrast(param, JSON.toJSONString(socketMsg));
}
/**

View File

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
/**
@@ -36,7 +37,6 @@ public class CnSocketUtil {
socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue());
SocketManager.sendMsg(param.getUserPageId() + DEV_TAG, JSON.toJSONString(socketMsg));
WebServiceManager.removePreDetectionParam();
}
@@ -55,7 +55,6 @@ public class CnSocketUtil {
}
/**
* 比对式-退出检测
*/

View File

@@ -1,24 +1,51 @@
package com.njcn.gather.detection.util.socket;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.njcn.gather.detection.pojo.param.ContrastDetectionParam;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.config.SocketConnectionConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.njcn.gather.plan.pojo.enums.DataSourceEnum;
import com.njcn.gather.script.pojo.po.SourceIssue;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Socket连接管理器
* 提供Socket连接的生命周期管理、消息发送、检测任务管理等功能
*
* 包含以下主要功能:
* 1. 基础连接管理addUser, removeUser, sendMsg等
* 2. 智能消息发送smartSendToSource, smartSendToDevice等新增
* 3. 检测任务管理targetMap, sourceIssueList等管理
* 4. 连接状态监控getConnectionStatus等新增
*
* @Description: webSocket存储的通道
* @Author: wr
* @Author: wr, hongawen
* @Date: 2024/12/11 13:04
*/
@Slf4j
@Component
public class SocketManager {
@Resource
private SocketConnectionConfig socketConnectionConfig;
/**
* key为userIdxxx_Source、xxx_Devvalue为channel
*/
@@ -46,12 +73,13 @@ public class SocketManager {
e.printStackTrace();
}
NioEventLoopGroup eventExecutors = socketGroup.get(userId);
if(ObjectUtil.isNotNull(channel)){
if(ObjectUtil.isNotNull(eventExecutors)){
eventExecutors.shutdownGracefully();
System.out.println(userId+"__"+channel.id()+"关闭了客户端");
}
}
socketSessions.remove(userId);
socketGroup.remove(userId);
}
public static Channel getChannelByUserId(String userId) {
@@ -66,11 +94,161 @@ public class SocketManager {
Channel channel = socketSessions.get(userId);
if(ObjectUtil.isNotNull(channel)){
channel.writeAndFlush(msg+'\n');
System.out.println(userId+"__"+channel.id()+""+channel.remoteAddress()+"发送数据:"+msg);
log.info("{}__{}往{}发送数据:{}", userId, channel.id(), channel.remoteAddress(), msg);
}else{
System.out.println(userId+"__发送数据失败通道不存在"+msg);
log.warn("{}__发送数据失败通道不存在{}", userId, msg);
}
}
// =================== 智能发送功能 ===================
/**
* 智能发送消息到程控源设备
* 自动从配置文件读取IP和PORT开发者无需关心网络配置
* 如果连接不存在且requestId需要建立连接会自动建立连接后发送
*
* @param param 检测参数包含用户页面ID等信息
* @param msg 要发送的消息内容JSON格式包含requestId字段
*/
public void smartSendToSource(PreDetectionParam param, String msg) {
String requestId = extractRequestId(msg);
String userId = param.getUserPageId() + CnSocketUtil.SOURCE_TAG;
// 检查是否需要建立连接
if (SocketConnectionConfig.needsSourceConnection(requestId)) {
String ip = socketConnectionConfig.getSource().getIp();
Integer port = socketConnectionConfig.getSource().getPort();
// 检查连接是否存在且活跃
if (!isChannelActive(userId)) {
log.info("程控源连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
// 异步建立程控源连接并发送消息
CompletableFuture.runAsync(() -> {
NettyClient.connectToSourceStatic(ip, port, param, msg);
});
return;
}
}
// 连接已存在或不需要建立连接,直接发送消息
log.info("直接发送消息到程控源: userId={}, requestId={}", userId, requestId);
sendMsg(userId, msg);
}
/**
* 智能发送消息到被检设备
* 自动从配置文件读取IP和PORT开发者无需关心网络配置
* 如果连接不存在且requestId需要建立连接会自动建立连接后发送
*
* @param param 检测参数包含用户页面ID等信息
* @param msg 要发送的消息内容JSON格式包含requestId字段
*/
public void smartSendToDevice(PreDetectionParam param, String msg) {
String requestId = extractRequestId(msg);
String userId = param.getUserPageId() + CnSocketUtil.DEV_TAG;
// 检查是否需要建立连接
if (SocketConnectionConfig.needsDeviceConnection(requestId)) {
String ip = socketConnectionConfig.getDevice().getIp();
Integer port = socketConnectionConfig.getDevice().getPort();
// 检查连接是否存在且活跃
if (!isChannelActive(userId)) {
log.info("被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
// 异步建立被检设备连接并发送消息
CompletableFuture.runAsync(() -> {
NettyClient.connectToDeviceStatic(ip, port, param, msg);
});
return;
}
}
// 连接已存在或不需要建立连接,直接发送消息
log.info("直接发送消息到被检设备: userId={}, requestId={}", userId, requestId);
sendMsg(userId, msg);
}
/**
* 比对智能发送消息到被检设备
* 自动从配置文件读取IP和PORT开发者无需关心网络配置
* 如果连接不存在且requestId需要建立连接会自动建立连接后发送
*
* @param param 检测参数包含用户页面ID等信息
* @param msg 要发送的消息内容JSON格式包含requestId字段
*/
public void smartSendToContrast(ContrastDetectionParam param, String msg) {
String requestId = extractRequestId(msg);
String userId = param.getLoginName() + CnSocketUtil.CONTRAST_DEV_TAG;
// 检查是否需要建立连接
if (SocketConnectionConfig.needsDeviceConnection(requestId)) {
String ip = socketConnectionConfig.getDevice().getIp();
Integer port = socketConnectionConfig.getDevice().getPort();
// 检查连接是否存在且活跃
if (!isChannelActive(userId)) {
log.info("比对被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
// 异步建立比对被检设备连接并发送消息
CompletableFuture.runAsync(() -> {
NettyClient.connectToContrastDeviceStatic(ip, port, param, msg);
});
return;
}
}
// 连接已存在或不需要建立连接,直接发送消息
log.info("直接发送消息到比对被检设备: userId={}, requestId={}", userId, requestId);
sendMsg(userId, msg);
}
// =================== 私有工具方法 ===================
/**
* 从消息中提取requestId
* 支持JSON格式的消息解析
*
* @param msg 消息内容
* @return String requestId如果解析失败返回"unknown"
*/
private static String extractRequestId(String msg) {
try {
if (StrUtil.isNotBlank(msg)) {
// 尝试解析JSON格式消息
JSONObject jsonObject = JSON.parseObject(msg);
String requestId = jsonObject.getString("requestId");
if (StrUtil.isNotBlank(requestId)) {
return requestId;
}
// 如果没有requestId字段尝试解析request_id字段
requestId = jsonObject.getString("request_id");
if (StrUtil.isNotBlank(requestId)) {
return requestId;
}
// 如果没有JSON字段尝试从普通字符串中匹配
if (msg.contains("requestId=")) {
String[] parts = msg.split("requestId=");
if (parts.length > 1) {
String idPart = parts[1].split("[,\\s&]")[0];
return idPart.trim();
}
}
}
} catch (Exception e) {
log.warn("解析消息中的requestId失败: msg={}, error={}", msg, e.getMessage());
}
return "unknown";
}
/**
* 检查指定用户的Channel是否活跃
*
* @param userId 用户ID
* @return boolean true:连接活跃, false:连接不存在或不活跃
*/
private static boolean isChannelActive(String userId) {
Channel channel = getChannelByUserId(userId);
return ObjectUtil.isNotNull(channel) && channel.isActive();
}
@@ -134,9 +312,5 @@ public class SocketManager {
}

View File

@@ -1,63 +0,0 @@
package com.njcn.gather.detection.util.socket;
import com.njcn.gather.detection.pojo.enums.DetectionCodeEnum;
import java.util.Arrays;
/**
* @author wr
* @description
* @date 2025/3/27 14:58
*/
public class UnitUtil {
// public static String unit(String code, Integer fly) {
// String unit = "";
// if (Arrays.asList(0, 1).contains(fly)) {
// if (DetectionCodeEnum.FREQ.getCode().equals(code)) {
// unit = "Hz";
// }
// if (DetectionCodeEnum.VRMS.getCode().equals(code)) {
// unit = "V";
// }
// if (DetectionCodeEnum.IRMS.getCode().equals(code)) {
// unit = "A";
// }
// if (DetectionCodeEnum.V2_50.getCode().equals(code) ||
// DetectionCodeEnum.SV_1_49.getCode().equals(code)||
// DetectionCodeEnum.V_UNBAN.getCode().equals(code) ||
// DetectionCodeEnum.I_UNBAN.getCode().equals(code)
// ) {
// unit = "%";
// }
// if (DetectionCodeEnum.I2_50.getCode().equals(code) ||
// DetectionCodeEnum.SI_1_49.getCode().equals(code)
// ) {
// unit = "A";
// }
// if (DetectionCodeEnum.P2_50.getCode().equals(code)) {
// unit = "W";
// }
// if (DetectionCodeEnum.P.getCode().equals(code)) {
// unit = "P";
// }
// if (DetectionCodeEnum.MAG.getCode().equals(code)) {
// unit = "V";
// }
// if (DetectionCodeEnum.DUR.getCode().equals(code)) {
// unit = "s";
// }
// if (DetectionCodeEnum.VA.getCode().equals(code) ||
// DetectionCodeEnum.IA.getCode().equals(code)
// ) {
// unit = "°";
// }
// if (DetectionCodeEnum.DELTA_V.getCode().equals(code)
// ) {
// unit = "%";
// }
// }else{
// unit = "%";
// }
// return unit;
// }
}

View File

@@ -11,51 +11,139 @@ import com.njcn.gather.detection.util.socket.SocketManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @Author: cdf
* @CreateTime: 2025-02-11
* @Description: 心跳处理类
* Netty心跳处理器
* <p>
* 负责维护Socket长连接的心跳检测机制通过定期发送心跳包来检测连接状态
* 当连续多次未收到心跳响应时自动断开连接并清理相关资源。
* </p>
*
* <h3>核心功能:</h3>
* <ul>
* <li>定时发送心跳包 (默认10秒间隔3秒后开始)</li>
* <li>监听心跳响应,重置失败计数器</li>
* <li>连续失败超过阈值时触发断开逻辑 (默认3次)</li>
* <li>异步处理断开操作,避免阻塞心跳线程</li>
* <li>优雅关闭资源,防止内存泄漏</li>
* </ul>
*
* <h3>心跳机制流程:</h3>
* <pre>
* 连接建立 → 启动心跳定时任务(3秒后开始每10秒执行)
* ↓
* 发送心跳包 → 等待响应 → 收到响应(重置计数器) / 未收到响应(递增计数器)
* ↓
* 连续3次失败 → 异步执行断开逻辑 → 发送退出指令 → 延迟清理连接
* ↓
* 连接断开 → 优雅关闭定时任务和线程池
* </pre>
*
* <h3>线程安全设计:</h3>
* <p>
* 使用单线程的ScheduledExecutorService处理心跳发送避免并发问题。
* 超时处理使用CompletableFuture异步执行不阻塞心跳发送线程。
* Future引用使用volatile修饰确保多线程环境下的可见性。
* </p>
*
* <h3>设备类型支持:</h3>
* <ul>
* <li>程控源设备 (CnSocketUtil.SOURCE_TAG): "_Source"</li>
* <li>被检设备 (CnSocketUtil.DEV_TAG): "_Dev"</li>
* </ul>
*
* <h3>使用示例:</h3>
* <pre>{@code
* // 创建心跳处理器
* HeartbeatHandler handler = new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG);
*
* // 添加到Netty管道中
* pipeline.addLast(handler);
* }</pre>
*
* @author cdf
* @since 2025-02-11
* @version 1.2
*/
@Slf4j
public class HeartbeatHandler extends SimpleChannelInboundHandler<String> {
/** 心跳定时任务执行器,使用单线程池避免并发问题 */
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1);
/** 检测参数包含用户页面ID等信息 */
private final PreDetectionParam param;
/** 处理器类型标识("_Source" 或 "_Dev" */
private final String handlerType;
/** 保存定时任务的Future引用便于取消和管理 */
private ScheduledFuture<?> heartbeatFuture;
// 允许连续未收到心跳响应的最大次数
/** 允许连续未收到心跳响应的最大次数 */
private static final int MAX_HEARTBEAT_MISSES = 3;
// 连续未收到心跳响应的次数
/** 连续未收到心跳响应的次数 */
private int consecutiveHeartbeatMisses = 0;
/**
* 构造函数
*
* @param param 检测参数包含用户页面ID等信息
* @param type 处理器类型CnSocketUtil.SOURCE_TAG 或 CnSocketUtil.DEV_TAG
*/
public HeartbeatHandler(PreDetectionParam param, String type) {
this.param = param;
this.handlerType = type;
}
/**
* 通道激活时的回调方法
* 在Socket连接建立成功后被调用启动心跳机制
*
* @param ctx Netty的通道上下文对象
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("心跳处理器启动 - 设备类型: {}", handlerType);
// 启动心跳定时任务
scheduleHeartbeat(ctx);
// 传播事件给管道中的后续处理器
ctx.fireChannelActive();
}
/**
* 通道断开时的回调方法
* 在Socket连接断开时被调用负责清理相关资源
*
* @param ctx Netty的通道上下文对象
* @throws Exception 异常情况
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
heartbeatExecutor.shutdown();
log.info("心跳处理器开始清理资源 - 设备类型: {}", handlerType);
shutdownExecutorGracefully();
super.channelInactive(ctx);
}
// 每10秒发送一次心跳
/**
* 启动心跳定时任务
* 每10秒发送一次心跳包3秒后开始执行
*
* @param ctx Netty的通道上下文对象
*/
private void scheduleHeartbeat(ChannelHandlerContext ctx) {
heartbeatExecutor.scheduleAtFixedRate(() -> {
heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(() -> {
if (ctx.channel().isActive()) {
// 发送心跳包
SocketMsg<String> msg = new SocketMsg<>();
@@ -64,50 +152,148 @@ public class HeartbeatHandler extends SimpleChannelInboundHandler<String> {
msg.setData("");
ctx.channel().writeAndFlush(JSON.toJSONString(msg) + "\n");
System.out.println(handlerType + "♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥send" + LocalDateTime.now());
log.debug("心跳发送 - 设备类型: {}, 时间: {}", handlerType, LocalDateTime.now());
consecutiveHeartbeatMisses++;
if (consecutiveHeartbeatMisses >= MAX_HEARTBEAT_MISSES) {
// 连续三次未收到心跳响应,断开连接
System.out.println(handlerType + "连续三次未收到心跳响应,断开连接");
if (CnSocketUtil.DEV_TAG.equals(handlerType)) {
//CnSocketUtil.sendToWebSocket(param.getUserPageId(),);
CnSocketUtil.quitSend(param);
} else if (CnSocketUtil.CONTRAST_DEV_TAG.equals(handlerType)) {
CnSocketUtil.contrastSendquit(param.getUserPageId());
} else {
CnSocketUtil.quitSendSource(param);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("线程中断异常: " + e.getMessage());
}
String key = param.getUserPageId() + handlerType;
SocketManager.removeUser(key);
// 连续三次未收到心跳响应,异步处理断开逻辑,避免阻塞心跳线程
log.warn("心跳响应超时 - 设备类型: {}, 连续失败次数: {}/{}, 执行断开连接",
handlerType, consecutiveHeartbeatMisses, MAX_HEARTBEAT_MISSES);
handleHeartbeatTimeoutAsync();
consecutiveHeartbeatMisses = 0; // 重置连续心跳丢失次数
}
}
}, 3, 10, TimeUnit.SECONDS);
}
/**
* 异步处理心跳超时断开逻辑
* <p>
* 使用CompletableFuture避免阻塞心跳发送线程确保心跳机制不受影响。
* 处理流程:
* 1. 异步发送退出指令
* 2. 延迟3秒后清理Socket连接
* 3. 记录处理过程和异常
* </p>
*/
private void handleHeartbeatTimeoutAsync() {
CompletableFuture.runAsync(() -> {
try {
log.info("开始执行心跳超时断开处理 - 设备类型: {}", handlerType);
// 根据设备类型发送对应的退出指令
if (CnSocketUtil.DEV_TAG.equals(handlerType)) {
CnSocketUtil.quitSend(param);
} else {
CnSocketUtil.quitSendSource(param);
}
log.debug("退出指令已发送等待3秒后清理连接 - 设备类型: {}", handlerType);
} catch (Exception e) {
log.error("心跳超时处理发送退出指令异常 - 设备类型: {}", handlerType, e);
}
}).thenRunAsync(() -> {
try {
// 延迟3秒后清理连接给退出指令留出处理时间
Thread.sleep(3000);
// 构建连接Key并从SocketManager中移除
String key = CnSocketUtil.DEV_TAG.equals(handlerType) ?
param.getUserPageId() + CnSocketUtil.DEV_TAG :
param.getUserPageId() + CnSocketUtil.SOURCE_TAG;
SocketManager.removeUser(key);
log.info("心跳超时断开处理完成 - 设备类型: {}, 连接已清理", handlerType);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("心跳超时处理等待过程中被中断 - 设备类型: {}", handlerType);
} catch (Exception e) {
log.error("心跳超时处理清理连接异常 - 设备类型: {}", handlerType, e);
}
});
}
/**
* 优雅关闭线程池执行器
* <p>
* 确保资源的完全清理,避免内存泄漏:
* 1. 取消当前的心跳定时任务
* 2. 关闭线程池并等待正在执行的任务完成
* 3. 如果等待超时则强制关闭
* 4. 处理中断异常并恢复中断状态
* </p>
*/
private void shutdownExecutorGracefully() {
try {
// 1. 取消心跳定时任务
if (heartbeatFuture != null && !heartbeatFuture.isCancelled()) {
boolean cancelled = heartbeatFuture.cancel(false);
log.debug("心跳定时任务取消结果: {} - 设备类型: {}", cancelled, handlerType);
}
// 2. 关闭线程池,不再接收新任务
heartbeatExecutor.shutdown();
// 3. 等待已提交的任务完成最多等待5秒
if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("心跳线程池未能在5秒内正常关闭执行强制关闭 - 设备类型: {}", handlerType);
heartbeatExecutor.shutdownNow();
// 再次等待强制关闭完成最多等待2秒
if (!heartbeatExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
log.error("心跳线程池强制关闭失败 - 设备类型: {}", handlerType);
}
} else {
log.debug("心跳线程池已优雅关闭 - 设备类型: {}", handlerType);
}
} catch (InterruptedException e) {
// 如果等待过程中被中断,立即强制关闭
log.warn("心跳线程池关闭过程中被中断,执行强制关闭 - 设备类型: {}", handlerType);
heartbeatExecutor.shutdownNow();
// 恢复中断状态遵循Java并发编程最佳实践
Thread.currentThread().interrupt();
}
}
/**
* 消息接收处理方法
* <p>
* 负责处理从服务端接收到的消息:
* 1. 过滤心跳响应包,重置失败计数器
* 2. 业务消息传递给后续处理器
* </p>
*
* @param ctx Netty的通道上下文对象
* @param msg 接收到的消息内容
* @throws Exception 异常情况
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 过滤心跳包,避免进入业务逻辑
if (isHeartbeatPacket(msg)) {
System.out.println(handlerType + "♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥♥response" + LocalDateTime.now());
log.debug("心跳响应 - 设备类型: {}, 时间: {}", handlerType, LocalDateTime.now());
// 重置连续失败计数器,表示连接正常
consecutiveHeartbeatMisses = 0;
return;
}
// 处理业务数据
// 业务消息传递给管道中的后续处理器
ctx.fireChannelRead(msg);
}
/**
* 判断是否为心跳数据包
* <p>
* 通过解析消息的operateCode字段来判断是否为心跳响应。
* 心跳包的操作码为SourceOperateCodeEnum.HEARTBEAT。
* </p>
*
* @param msg 需要判断的消息内容
* @return true:心跳包, false:业务消息
*/
private boolean isHeartbeatPacket(String msg) {
// 判断是否为心跳包
SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
return !Objects.isNull(socketDataMsg.getOperateCode()) && socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.HEARTBEAT.getValue());
try {
// 解析消息为SocketDataMsg对象
SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
// 检查操作码是否为心跳类型
return socketDataMsg != null &&
socketDataMsg.getOperateCode() != null &&
socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.HEARTBEAT.getValue());
} catch (Exception e) {
// 消息解析失败,可能不是标准格式的心跳包
log.debug("消息解析失败,可能不是心跳包: {}", msg, e);
return false;
}
}
}

View File

@@ -1,9 +1,12 @@
package com.njcn.gather.detection.util.socket.cilent;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.njcn.gather.detection.handler.SocketContrastResponseService;
import com.njcn.gather.detection.handler.SocketDevResponseService;
import com.njcn.gather.detection.handler.SocketSourceResponseService;
import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum;
import com.njcn.gather.detection.pojo.param.ContrastDetectionParam;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.SocketDataMsg;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
@@ -18,158 +21,484 @@ import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* Netty客户端工具类
* 用于建立与检测设备和程控源设备的Socket通信连接
* 支持心跳检测、断线重连和异常处理
*
* @Description: 心跳检测服务端 对应的服务端在netty-server 包下的NettyClient
* @Author: wr
* @Date: 2024/12/10 14:16
*/
@Getter
@Slf4j
@Component
public class NettyClient {
public static void socketClient(String ip, Integer port, PreDetectionParam param, String msg, SimpleChannelInboundHandler<String> handler) {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
if (handler instanceof NettySourceClientHandler) {
ch.pipeline()
//空闲状态的handler
// 添加LineBasedFrameDecoder来按行分割数据
.addLast(new LineBasedFrameDecoder(10240))
// .addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG))
.addLast(handler);
} else if (handler instanceof NettyDevClientHandler) {
ch.pipeline()
// 添加LineBasedFrameDecoder来按行分割数据
.addLast(new LineBasedFrameDecoder(10240))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new HeartbeatHandler(param, CnSocketUtil.DEV_TAG))
//空闲状态的handler
//readerIdleTimeSeconds在指定的秒数内如果没有读取到任何数据则触发IdleState.READER_IDLE事件。
//writerIdleTimeSeconds在指定的秒数内如果没有写入任何数据则触发IdleState.WRITER_IDLE事件。
//allIdleTimeSeconds在指定的秒数内如果没有发生任何读取或写入操作则触发IdleState.ALL_IDLE事件。
.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
.addLast(handler);
} else if (handler instanceof NettyContrastClientHandler) {
ch.pipeline()
@Resource
private SocketSourceResponseService socketSourceResponseService;
// 添加LineBasedFrameDecoder来按行分割数据
.addLast(new LineBasedFrameDecoder(10240))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new HeartbeatHandler(param, CnSocketUtil.CONTRAST_DEV_TAG))
//空闲状态的handler
//readerIdleTimeSeconds在指定的秒数内如果没有读取到任何数据则触发IdleState.READER_IDLE事件。
//writerIdleTimeSeconds在指定的秒数内如果没有写入任何数据则触发IdleState.WRITER_IDLE事件。
//allIdleTimeSeconds在指定的秒数内如果没有发生任何读取或写入操作则触发IdleState.ALL_IDLE事件。
.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
.addLast(handler);
}
}
});
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
channelFuture.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
System.out.println("链接服务端失败...");
// 连接失败时关闭 group
group.shutdownGracefully();
} else {
System.out.println("链接服务端成功...");
if (handler instanceof NettySourceClientHandler) {
NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + CnSocketUtil.SOURCE_TAG);
if (ObjectUtil.isNotNull(groupByUserId)) {
groupByUserId.shutdownGracefully().sync();
}
SocketManager.addGroup(param.getUserPageId() + CnSocketUtil.SOURCE_TAG, group);
} else if(handler instanceof NettyDevClientHandler){
NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + CnSocketUtil.DEV_TAG);
if (ObjectUtil.isNotNull(groupByUserId)) {
groupByUserId.shutdownGracefully().sync();
}
SocketManager.addGroup(param.getUserPageId() + CnSocketUtil.DEV_TAG, group);
}else if(handler instanceof NettyContrastClientHandler){
NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + CnSocketUtil.CONTRAST_DEV_TAG);
if (ObjectUtil.isNotNull(groupByUserId)) {
groupByUserId.shutdownGracefully().sync();
}
SocketManager.addGroup(param.getUserPageId() + CnSocketUtil.CONTRAST_DEV_TAG, group);
}
@Resource
private SocketDevResponseService socketDevResponseService;
System.out.println("客户端向服务端发送消息:" + port + msg);
channelFuture.channel().writeAndFlush(msg + "\n");
}
});
} catch (Exception e) {
System.out.println("连接socket服务端发送异常............" + e.getMessage());
group.shutdownGracefully();
//TODO 通知页面
SocketDataMsg socketDataMsg = new SocketDataMsg();
socketDataMsg.setType("aaa");
socketDataMsg.setCode(SourceResponseCodeEnum.SOCKET_ERROR.getCode());
socketDataMsg.setData(SourceResponseCodeEnum.SOCKET_ERROR.getMessage());
socketDataMsg.setRequestId("connect");
if (handler instanceof NettySourceClientHandler) {
socketDataMsg.setOperateCode("Source");
} else if (handler instanceof NettyDevClientHandler) {
CnSocketUtil.quitSendSource(param);
socketDataMsg.setOperateCode("Dev");
} else {
socketDataMsg.setOperateCode("Contrast_Dev");
CnSocketUtil.contrastSendquit(param.getUserPageId());
}
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
} finally {
// System.out.println("进入clientSocket最后步骤---------------------");
@Resource
private SocketContrastResponseService socketContrastResponseService;
/**
* 静态实例,用于保持向后兼容
*/
private static NettyClient instance;
@PostConstruct
public void init() {
instance = this;
}
/**
* 创建程控源Handler实例Spring管理方式
* 自动注入SocketSourceResponseService统一使用CnSocketUtil.SOURCE_TAG
*
* @param param 检测参数
* @return NettySourceClientHandler 程控源处理器实例
*/
public NettySourceClientHandler createSourceHandler(PreDetectionParam param) {
return new NettySourceClientHandler(param, socketSourceResponseService);
}
/**
* 创建被检设备Handler实例Spring管理方式
* 自动注入SocketDevResponseService
*
* @param param 检测参数
* @return NettyDevClientHandler 被检设备处理器实例
*/
public NettyDevClientHandler createDeviceHandler(PreDetectionParam param) {
return new NettyDevClientHandler(param, socketDevResponseService);
}
/**
* 智能连接程控源设备(新增方法)
* 自动创建Handler并建立连接
*
* @param ip 程控源IP地址
* @param port 程控源端口
* @param param 检测参数
* @param msg 初始消息
*/
public void connectToSource(String ip, Integer port, PreDetectionParam param, String msg) {
NettySourceClientHandler handler = createSourceHandler(param);
executeSocketConnection(ip, port, param, msg, handler);
}
/**
* 智能连接被检设备(新增方法)
* 自动创建Handler并建立连接
*
* @param ip 被检设备IP地址
* @param port 被检设备端口
* @param param 检测参数
* @param msg 初始消息
*/
public void connectToDevice(String ip, Integer port, PreDetectionParam param, String msg) {
NettyDevClientHandler handler = createDeviceHandler(param);
executeSocketConnection(ip, port, param, msg, handler);
}
/**
* 智能连接比对被检设备(新增方法)
* 自动创建Handler并建立连接
*
* @param ip 被检设备IP地址
* @param port 被检设备端口
* @param param 检测参数
* @param msg 初始消息
* 静态方法:智能连接程控源设备(兼容性包装)
*/
private void connectToContrast(String ip, Integer port, ContrastDetectionParam param, String msg) {
PreDetectionParam preDetectionParam = new PreDetectionParam();
preDetectionParam.setUserPageId(param.getLoginName());
NettyContrastClientHandler handler = new NettyContrastClientHandler(preDetectionParam, socketContrastResponseService);
executeSocketConnection(ip, port, preDetectionParam, msg, handler);
}
/**
* 静态方法:智能连接程控源设备(兼容性包装)
*/
public static void connectToContrastDeviceStatic(String ip, Integer port, ContrastDetectionParam param, String msg) {
if (instance != null) {
instance.connectToContrast(ip, port, param, msg);
} else {
log.error("NettyClient未初始化无法创建比对设备通讯连接");
}
}
/**
* 静态方法:智能连接程控源设备(兼容性包装)
*/
public static void connectToSourceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
if (instance != null) {
instance.connectToSource(ip, port, param, msg);
} else {
log.error("NettyClient未初始化无法创建程控源连接");
}
}
/**
* 重连方法
* 静态方法:智能连接被检设备(兼容性包装)
*/
public static void connect(Bootstrap bootstrap, String msg) {
try {
bootstrap.connect("127.0.0.1", 8787).sync()
.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
ch.channel().close();
final EventLoop loop = ch.channel().eventLoop();
loop.schedule(() -> {
System.err.println("服务端链接不上,开始重连操作...");
//重连
connect(bootstrap, msg);
}, 3L, TimeUnit.SECONDS);
} else {
if (StrUtil.isNotBlank(msg)) {
ch.channel().writeAndFlush(msg);
}
System.out.println("服务端链接成功...");
}
});
} catch (Exception e) {
System.out.println(e.getMessage());
try {
Thread.sleep(3000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//再重连
connect(bootstrap, msg);
public static void connectToDeviceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
if (instance != null) {
instance.connectToDevice(ip, port, param, msg);
} else {
log.error("NettyClient未初始化无法创建被检设备连接");
}
}
/**
* 内部重构后的实现 - 拆分职责但不暴露给外部
* 执行完整的Socket连接建立流程
* 1. 创建事件循环组
* 2. 配置Bootstrap启动器
* 3. 设置管道处理链
* 4. 建立连接并处理结果
*
* @param ip 目标服务器IP地址
* @param port 目标服务器端口号
* @param param 检测参数对象
* @param msg 连接成功后发送的初始消息
* @param handler 业务处理器(区分程控源和被检设备)
*/
private static void executeSocketConnection(String ip, Integer port,
PreDetectionParam param, String msg, SimpleChannelInboundHandler<String> handler) {
// 创建NIO事件循环组用于处理网络I/O事件
NioEventLoopGroup group = createEventLoopGroup();
try {
// 配置客户端启动器
Bootstrap bootstrap = configureBootstrap(group);
// 创建管道初始化器,配置编解码和业务处理链
ChannelInitializer<NioSocketChannel> initializer = createChannelInitializer(param, handler);
bootstrap.handler(initializer);
// 同步连接到目标服务器
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
// 处理连接结果(成功或失败)
handleConnectionResult(channelFuture, param, handler, group, msg);
} catch (Exception e) {
// 处理连接过程中的异常
handleConnectionException(e, param, handler, group);
}
}
/**
* 创建NIO事件循环组
* 用于管理网络I/O操作的线程池处理连接、读写等异步事件
*
* @return NioEventLoopGroup 事件循环组实例
*/
private static NioEventLoopGroup createEventLoopGroup() {
return new NioEventLoopGroup();
}
/**
* 配置Bootstrap客户端启动器
* 设置连接超时、通道类型等基础参数
*
* @param group 事件循环组
* @return Bootstrap 配置好的启动器
*/
private static Bootstrap configureBootstrap(NioEventLoopGroup group) {
return new Bootstrap()
// 绑定事件循环组
.group(group)
// 连接超时5秒
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// 使用NIO Socket通道
.channel(NioSocketChannel.class);
}
/**
* 创建通道初始化器
* 当新连接建立时,初始化该连接的处理管道
*
* @param param 检测参数,用于配置心跳处理器
* @param handler 业务处理器
* @return ChannelInitializer 通道初始化器
*/
private static ChannelInitializer<NioSocketChannel> createChannelInitializer(
PreDetectionParam param, SimpleChannelInboundHandler<String> handler) {
return new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
setupPipeline(ch.pipeline(), param, handler);
}
};
}
/**
* 配置管道处理链
* 按顺序添加各种处理器,构成完整的数据处理流水线:
* 1. LineBasedFrameDecoder按行分割数据解决TCP粘包拆包问题
* 2. StringDecoder/StringEncoder字符串编解码器
* 3. HeartbeatHandler心跳处理器维持连接活跃
* 4. IdleStateHandler空闲检测器仅被检设备需要
* 5. 业务处理器:具体的业务逻辑处理
*
* @param pipeline 管道对象
* @param param 检测参数
* @param handler 业务处理器
*/
private static void setupPipeline(ChannelPipeline pipeline,
PreDetectionParam param, SimpleChannelInboundHandler<String> handler) {
// 基础编解码器:处理数据格式转换和粘包拆包
// 按行分割最大10KB
pipeline.addLast(new LineBasedFrameDecoder(10240))
// 字节转字符串
.addLast(new StringDecoder(CharsetUtil.UTF_8))
// 字符串转字节
.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 心跳处理器:根据设备类型选择不同的标签
String tag = getDeviceTag(handler);
pipeline.addLast(new HeartbeatHandler(param, tag));
// 空闲检测器仅被检设备和比对被检设备需要60秒无读操作触发空闲事件
if (!isSourceHandler(handler)) {
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
}
// 业务处理器:处理具体的检测业务逻辑
pipeline.addLast(handler);
}
/**
* 判断是否为程控源处理器
* 程控源设备和被检设备使用不同的处理器和配置
*
* @param handler 业务处理器
* @return boolean true:程控源处理器, false:被检设备处理器
*/
private static boolean isSourceHandler(SimpleChannelInboundHandler<String> handler) {
return handler instanceof NettySourceClientHandler;
}
/**
* 获取设备标签
* 用于在SocketManager中区分不同类型的设备连接
*
* @param handler 业务处理器
* @return String 设备标签("_Source" 或 "_Dev"
*/
private static String getDeviceTag(SimpleChannelInboundHandler<String> handler) {
String tag;
if (handler instanceof NettySourceClientHandler) {
tag = CnSocketUtil.SOURCE_TAG;
} else if (handler instanceof NettyDevClientHandler) {
tag = CnSocketUtil.DEV_TAG;
} else {
tag = CnSocketUtil.CONTRAST_DEV_TAG;
}
return tag;
}
/**
* 获取名称
* 用于在SocketManager中区分不同类型的设备连接
*
* @param handler 业务处理器
* @return String 设备标签("程控源设备" 或 "被检设备"
*/
private static String getDeviceType(SimpleChannelInboundHandler<String> handler) {
String deviceType;
if (handler instanceof NettySourceClientHandler) {
deviceType = "程控源设备";
} else if (handler instanceof NettyDevClientHandler) {
deviceType = "被检设备";
} else {
deviceType = "比对被检设备";
}
return deviceType;
}
/**
* 处理连接结果
* 为连接Future添加监听器异步处理连接成功或失败的情况
*
* @param channelFuture 连接Future对象
* @param param 检测参数
* @param handler 业务处理器
* @param group 事件循环组
* @param msg 初始消息
*/
private static void handleConnectionResult(ChannelFuture channelFuture,
PreDetectionParam param, SimpleChannelInboundHandler<String> handler,
NioEventLoopGroup group, String msg) {
channelFuture.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
onConnectionFailure(handler, group);
} else {
onConnectionSuccess(channelFuture, param, handler, group, msg);
}
});
}
/**
* 连接失败处理
* 输出失败信息并优雅关闭事件循环组
*
* @param handler 业务处理器,用于区分设备类型
* @param group 事件循环组
*/
private static void onConnectionFailure(SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group) {
String deviceType = getDeviceType(handler);
log.info("连接{}服务端失败...", deviceType);
group.shutdownGracefully();
}
/**
* 连接成功处理
* 执行连接成功后的初始化操作:
* 1. 管理Socket连接会话注册EventLoopGroup到SocketManager
* 2. 注册Channel到SocketManager实现统一的连接管理
* 3. 通过SocketManager发送初始消息统一消息发送入口
*
* @param channelFuture 连接Future对象
* @param param 检测参数
* @param handler 业务处理器
* @param group 事件循环组
* @param msg 初始消息
*/
private static void onConnectionSuccess(ChannelFuture channelFuture,
PreDetectionParam param, SimpleChannelInboundHandler<String> handler,
NioEventLoopGroup group, String msg) {
String deviceType = getDeviceType(handler);
log.info("连接{}服务端成功...", deviceType);
// 管理连接会话将EventLoopGroup注册到SocketManager
manageSocketConnection(param, handler, group);
// 将Channel也注册到SocketManager便于统一消息发送
String userId = param.getUserPageId() + getDeviceTag(handler);
SocketManager.addUser(userId, channelFuture.channel());
// 通过SocketManager发送初始消息统一消息发送入口
SocketManager.sendMsg(userId, msg);
}
/**
* 管理Socket连接会话
* 将新建立的连接注册到SocketManager中进行统一管理
* 1. 检查并关闭同用户同设备类型的旧连接,避免资源泄露
* 2. 将新连接注册到SocketManager便于后续管理和查找
* <p>
* 连接Key格式{userPageId}_{deviceTag}
* 例如zhangsan_test_Source程控源 / zhangsan_test_Dev被检设备
*
* @param param 检测参数包含用户页面ID
* @param handler 业务处理器,用于区分设备类型
* @param group 事件循环组,表示具体的连接资源
*/
private static void manageSocketConnection(PreDetectionParam param,
SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group) {
// 构建连接标识用户ID + 设备标签
String key = param.getUserPageId() + getDeviceTag(handler);
// 关闭旧连接:同一用户同一设备类型只能有一个活跃连接
NioEventLoopGroup existingGroup = SocketManager.getGroupByUserId(key);
if (ObjectUtil.isNotNull(existingGroup)) {
try {
existingGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
}
}
// 注册新连接到SocketManager
SocketManager.addGroup(key, group);
}
/**
* 处理连接异常
* 当连接建立过程中发生异常时的统一处理流程:
* 1. 关闭相关资源,防止资源泄露
* 2. 执行设备相关的退出操作
* 3. 通过WebSocket向前端通知错误信息
*
* @param e 异常对象
* @param param 检测参数
* @param handler 业务处理器
* @param group 事件循环组
*/
private static void handleConnectionException(Exception e, PreDetectionParam param,
SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group) {
log.info("连接socket服务端发送异常: {}", e.getMessage());
// 关闭事件循环组资源
group.shutdownGracefully();
// 执行设备相关的退出操作
executeQuitOperations(param, handler);
// 通过WebSocket通知前端页面
notifyFrontendError(param, handler);
}
/**
* 执行退出操作
* 根据不同的设备处理器类型执行相应的退出指令:
* - NettyDevClientHandler被检设备处理器需要发送程控源退出指令
* - 其他非程控源处理器:发送通用退出指令
* - NettySourceClientHandler程控源处理器无需额外退出操作
*
* @param param 检测参数
* @param handler 业务处理器
*/
private static void executeQuitOperations(PreDetectionParam param,
SimpleChannelInboundHandler<String> handler) {
if (handler instanceof NettyDevClientHandler) {
// 被检设备异常时,发送程控源退出指令
CnSocketUtil.quitSendSource(param);
} else if (handler instanceof NettyContrastClientHandler) {
CnSocketUtil.contrastSendquit(param.getUserPageId());
}
// 程控源处理器异常时无需额外操作
}
/**
* 通知前端错误信息
* 构建错误消息对象并通过WebSocket发送给前端页面
* 前端可以根据操作码Source/Dev显示相应的错误提示
*
* @param param 检测参数包含用户页面ID
* @param handler 业务处理器,用于确定操作码
*/
private static void notifyFrontendError(PreDetectionParam param,
SimpleChannelInboundHandler<String> handler) {
// 构建错误消息对象
SocketDataMsg socketDataMsg = new SocketDataMsg();
// 消息类型
socketDataMsg.setType("aaa");
// 错误码
socketDataMsg.setCode(SourceResponseCodeEnum.SOCKET_ERROR.getCode());
// 错误消息
socketDataMsg.setData(SourceResponseCodeEnum.SOCKET_ERROR.getMessage());
// 请求ID标识
socketDataMsg.setRequestId("connect");
// 设置操作码:程控源为"Source",被检设备为"Dev"
String devTag = getDeviceTag(handler).substring(1);
socketDataMsg.setOperateCode(devTag);
// 通过WebSocket发送错误信息到前端页面
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
}
}

View File

@@ -21,227 +21,324 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.TimeoutException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ProtocolException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @Description: 源客户端业务处理(示例)
* Netty设备客户端处理器
* <p>负责处理与被检测设备的Socket通信包括</p>
* <ul>
* <li>通道生命周期管理(建立、断开)</li>
* <li>消息接收和处理</li>
* <li>心跳超时处理</li>
* <li>异常处理和恢复</li>
* </ul>
*
* @Description: 设备客户端业务处理器
* @Author: wr
* @Date: 2024/12/10 14:16
*/
@Slf4j
@RequiredArgsConstructor
public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
/** 闪变检测超时时间20分钟1300秒 */
private static final long FLICKER_TIMEOUT = 1300L;
/** 统计数据检测超时时间3分钟180秒 */
private static final long STATISTICS_TIMEOUT = 180L;
/** 实时数据检测超时时间1分钟60秒 */
private static final long REALTIME_TIMEOUT = 60L;
/** 暂停操作超时时间10分钟600秒 */
private static final long STOP_TIMEOUT = 600L;
/** 超时时默认结果标志3表示超时失败 */
private static final int DEFAULT_RESULT_FLAG = 3;
private final PreDetectionParam param;
private final SocketDevResponseService socketResponseService;
/**
* 当通道进行连接时推送消息
* 当通道连接建立时的处理逻辑
* <p>将关闭原有连接并将新连接注册到SocketManager中</p>
*
* @param ctx
* @param ctx 通道上下文
* @throws Exception 连接异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端通道已建立" + ctx.channel().id());
log.info("客户端通道已建立: {}", ctx.channel().id());
// 检查是否存在同一用户的老连接
Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + CnSocketUtil.DEV_TAG);
if (Objects.nonNull(channel)) {
try {
// 关闭老连接避免连接泄漏
channel.close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
log.error("关闭通道异常", e);
}
}
// 注册新的连接到用户管理器
SocketManager.addUser(param.getUserPageId() + CnSocketUtil.DEV_TAG, ctx.channel());
}
/**
* 处理服务端消息消息信息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
System.out.println("devhandler接收server端数据>>>>>>" + msg);
try {
socketResponseService.deal(param, msg);
} catch (Exception e) {
e.printStackTrace();
CnSocketUtil.quitSend(param);
}
}
/**
* 当通道断线时,支持重连
* 当通道断开时的清理工作
* <p>关闭连接,清理用户映射,退出源设备发送</p>
*
* @param ctx
* @param ctx 通道上下文
* @throws Exception 关闭异常
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("设备通讯客户端断线");
log.warn("设备通讯客户端断线");
ctx.close();
SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG);
CnSocketUtil.quitSendSource(param);
}
/**
* 用户事件的回调方法(自定义事件用于心跳机制)
* 处理从服务端接收到的消息
* <p>将消息交给SocketDevResponseService进行具体处理</p>
*
* @param ctx
* @param evt
* @param ctx 通道上下文
* @param msg 接收到的消息
* @throws InterruptedException 线程中断异常
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
log.info("devhandler接收server端数据: {}", msg);
try {
socketResponseService.deal(param, msg);
} catch (Exception e) {
log.error("处理服务端消息异常", e);
CnSocketUtil.quitSend(param);
}
}
/**
* 用户事件回调方法,主要用于处理心跳超时
* <p>当触发READER_IDLE事件时根据当前状态进行超时处理</p>
*
* @param ctx 通道上下文
* @param evt 用户事件对象
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
Boolean fly = false;
if (evt instanceof IdleStateEvent) { //IdleState.在一段时间内没有收到任何消息时,会触发该事件
if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
System.out.println(LocalDateTime.now() + "devHandler触发读超时函数**************************************");
if (!FormalTestManager.hasStopFlag) {
if (CollUtil.isNotEmpty(SocketManager.getSourceList())) {
SourceIssue sourceIssue = SocketManager.getSourceList().get(0);
if (SocketManager.clockMap.containsKey(sourceIssue.getIndex())) {
SocketManager.clockMap.put(sourceIssue.getIndex(), SocketManager.clockMap.get(sourceIssue.getIndex()) + 60L);
} else {
SocketManager.clockMap.put(sourceIssue.getIndex(), 60L);
}
if (sourceIssue.getType().equals(DicDataEnum.F.getCode())) {
//闪变,正常抛一轮最大等待20分钟超时
if (SocketManager.clockMap.get(sourceIssue.getIndex()) > 1300) {
fly = true;
System.out.println("超时处理-----》" + sourceIssue.getType() + "已超时----------------关闭");
CnSocketUtil.quitSend(param);
timeoutSend(sourceIssue);
}
} else if (sourceIssue.getType().equals(DicDataEnum.VOLTAGE.getCode()) || sourceIssue.getType().equals(DicDataEnum.HP.getCode())) {
//统计数据项,正常抛一轮数据,超时
if (SocketManager.clockMap.get(sourceIssue.getIndex()) > 180) {
fly = true;
CnSocketUtil.quitSend(param);
System.out.println("超时处理-----》" + sourceIssue.getType() + "已超时----------------关闭");
timeoutSend(sourceIssue);
}
} else {
//实时数据
if (SocketManager.clockMap.get(sourceIssue.getIndex()) > 60) {
fly = true;
CnSocketUtil.quitSend(param);
System.out.println("超时处理-----》" + sourceIssue.getType() + "已超时----------------关闭");
timeoutSend(sourceIssue);
}
}
} else {
fly = true;
//为空则认为是常规步骤,设定一分钟超时
CnSocketUtil.quitSend(param);
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.SOCKET_TIMEOUT);
}
if (fly) {
socketResponseService.backCheckState(param);
}
} else {
//如果是暂停操作后
FormalTestManager.stopTime += 60;
System.out.println("当前进入暂停操作超时函数-----------------" + FormalTestManager.stopTime);
if (FormalTestManager.stopTime > 600) {
CnSocketUtil.quitSend(param);
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.STOP_TIMEOUT);
}
}
// 检查是否为读取空闲事件(心跳超时)
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
log.warn("devHandler触发读超时函数: {}", LocalDateTime.now());
// 根据是否有停止标志采取不同的超时处理策略
if (!FormalTestManager.hasStopFlag) {
// 正常检测中的超时处理
handleReadTimeout();
} else {
// 暂停状态下的超时处理
handleStopTimeout();
}
}
}
/**
* 处理器被添加到管道时的回调
*
* @param ctx 通道上下文
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("有通道准备接入" + ctx.channel());
log.info("有通道准备接入: {}", ctx.channel());
}
/**
* 异常捕获处理
* <p>捕获并处理各种类型的异常,执行清理工作</p>
*
* @param ctx 通道上下文
* @param cause 异常原因
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("捕获到设备服务异常。。。。。。。");
// 处理异常,例如记录日志、关闭连接等
cause.printStackTrace();
// 根据异常类型进行不同的处理
if (cause instanceof ConnectException) {
// 处理连接异常,例如重试连接或记录特定的连接错误信息
System.out.println("连接socket服务端异常");
} else if (cause instanceof IOException) {
// 处理I/O异常例如读写错误
System.out.println("IOException caught: There was an I/O error.");
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR);
// 例如可以记录更详细的I/O错误信息
} else if (cause instanceof TimeoutException) {
// 处理超时异常
System.out.println("TimeoutException caught: Operation timed out.");
// 可以根据业务逻辑决定是否重试或记录超时信息
} else if (cause instanceof ProtocolException) {
// 处理协议异常,例如消息格式不正确
System.out.println("ProtocolException caught: Invalid protocol message.");
// 可以记录协议错误信息或向客户端发送错误响应
} else {
// 处理其他类型的异常
System.out.println("Unknown exception caught: " + cause.getMessage());
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR);
// 可以记录未知异常信息
}
log.error("捕获到设备服务异常", cause);
handleSpecificException(cause);
// 统一清理工作
CnSocketUtil.quitSend(param);
CnSocketUtil.quitSendSource(param);
socketResponseService.backCheckState(param);
ctx.close();
}
/**
* 发送业务消息时候开启计时器,
* @param requestId
* 处理特定类型的异常
* <p>根据异常类型进行相应的日志记录和错误通知</p>
*
* @param cause 异常对象
*/
/* private void scheduleTimeoutTask(String requestId) {
ScheduledFuture<?> future = scheduler.schedule(() -> {
if (requestTimeoutTasks.containsKey(requestId)) {
// 处理超时逻辑
System.out.println("Business request with ID " + requestId + " timed out.");
requestTimeoutTasks.remove(requestId);
ctx.close();
}
}, BUSINESS_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
requestTimeoutTasks.put(requestId, future);
}*/
private void handleSpecificException(Throwable cause) {
if (cause instanceof ConnectException) {
log.error("连接socket服务端异常");
} else if (cause instanceof IOException) {
log.error("IO异常: {}", cause.getMessage());
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR);
} else if (cause instanceof TimeoutException) {
log.error("操作超时: {}", cause.getMessage());
} else if (cause instanceof ProtocolException) {
log.error("协议异常: {}", cause.getMessage());
} else {
log.error("未知异常: {}", cause.getMessage());
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR);
}
}
/**
* 超时后的处理
* 处理读取超时事件
* <p>检查源列表,更新超时计数器,判断是否超时并处理</p>
*/
private void handleReadTimeout() {
if (CollUtil.isNotEmpty(SocketManager.getSourceList())) {
// 获取当前正在检测的源问题(取第一个)
SourceIssue sourceIssue = SocketManager.getSourceList().get(0);
// 更新该源问题的超时计数器
updateTimeoutCounter(sourceIssue);
// 根据检测类型判断是否已超时
if (isTimeout(sourceIssue)) {
handleTimeout(sourceIssue);
}
} else {
// 源列表为空,认为是常规步骤的超时,默认一分钟超时
CnSocketUtil.quitSend(param);
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.SOCKET_TIMEOUT);
socketResponseService.backCheckState(param);
}
}
/**
* 处理暂停操作的超时事件
* <p>当检测被暂停时,统计暂停时间,超过限制后发送超时通知</p>
*/
private void handleStopTimeout() {
FormalTestManager.stopTime += 60;
log.warn("当前进入暂停操作超时函数,停止时间: {}", FormalTestManager.stopTime);
if (FormalTestManager.stopTime > STOP_TIMEOUT) {
CnSocketUtil.quitSend(param);
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.STOP_TIMEOUT);
}
}
/**
* 更新指定源问题的超时计数器
* <p>每次调用时增加60秒用于统计累计超时时间</p>
*
* @param sourceIssue 源问题对象
*/
private void updateTimeoutCounter(SourceIssue sourceIssue) {
Integer index = sourceIssue.getIndex();
if (SocketManager.clockMap.containsKey(index)) {
SocketManager.clockMap.put(index, SocketManager.clockMap.get(index) + 60L);
} else {
SocketManager.clockMap.put(index, 60L);
}
}
/**
* 根据检测类型判断是否已超时
* <p>不同检测类型有不同的超时阈值:</p>
* <ul>
* <li>闪变检测20分钟</li>
* <li>统计数据3分钟</li>
* <li>实时数据1分钟</li>
* </ul>
*
* @param sourceIssue 源问题对象
* @return true 如果已超时false 否则
*/
private boolean isTimeout(SourceIssue sourceIssue) {
long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex());
String type = sourceIssue.getType();
// 根据不同检测类型使用不同的超时阈值
if (DicDataEnum.F.getCode().equals(type)) {
// 闪变检测需要更长时间20分钟超时
return currentTime > FLICKER_TIMEOUT;
} else if (DicDataEnum.VOLTAGE.getCode().equals(type) || DicDataEnum.HP.getCode().equals(type)) {
// 统计数据类型电压、谐波中等时间3分钟超时
return currentTime > STATISTICS_TIMEOUT;
} else {
// 实时数据类型短时间1分钟超时
return currentTime > REALTIME_TIMEOUT;
}
}
/**
* 执行超时处理操作
* <p>记录超时日志,退出发送,发送超时结果,恢复检测状态</p>
*
* @param sourceIssue 源问题对象
*/
private void handleTimeout(SourceIssue sourceIssue) {
log.warn("超时处理 - {} 已超时,关闭连接", sourceIssue.getType());
CnSocketUtil.quitSend(param);
timeoutSend(sourceIssue);
socketResponseService.backCheckState(param);
}
/**
* 发送超时结果
* <p>为所有设备创建超时的检测结果并通过WebSocket发送给客户端</p>
*
* @param sourceIssue 源问题对象
*/
private void timeoutSend(SourceIssue sourceIssue) {
List<DevLineTestResult> devListRes = new ArrayList<>();
FormalTestManager.devList.forEach(dev -> {
DevLineTestResult devLineTestResult = new DevLineTestResult();
devLineTestResult.setDeviceId(dev.getDevId());
devLineTestResult.setDeviceName(dev.getDevName());
List<Integer> resultFlagList = new ArrayList<>();
List<PreDetection.MonitorListDTO> monitorListDTOList = dev.getMonitorList();
monitorListDTOList.forEach(i -> resultFlagList.add(3));
devLineTestResult.setChnResult(resultFlagList.toArray(new Integer[monitorListDTOList.size()]));
devListRes.add(devLineTestResult);
});
List<DevLineTestResult> devListRes = FormalTestManager.devList.stream()
.map(this::createTimeoutResult)
.collect(Collectors.toList());
WebSocketVO<List<DevLineTestResult>> socketVO = new WebSocketVO<>();
socketVO.setRequestId(sourceIssue.getType() + CnSocketUtil.END_TAG);
socketVO.setOperateCode(sourceIssue.getType());
socketVO.setData(devListRes);
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketVO));
}
/**
* 为指定设备创建超时的检测结果
* <p>将所有监测点的结果设置为超时标志值</p>
*
* @param dev 设备对象
* @return 设备检测结果
*/
private DevLineTestResult createTimeoutResult(PreDetection dev) {
DevLineTestResult devLineTestResult = new DevLineTestResult();
devLineTestResult.setDeviceId(dev.getDevId());
devLineTestResult.setDeviceName(dev.getDevName());
Integer[] resultFlags = dev.getMonitorList().stream()
.map(monitor -> DEFAULT_RESULT_FLAG)
.toArray(Integer[]::new);
devLineTestResult.setChnResult(resultFlags);
return devLineTestResult;
}
}

View File

@@ -8,132 +8,133 @@ import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.TimeoutException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ProtocolException;
import cn.hutool.core.util.StrUtil;
/**
* @Description: 源客户端业务处理(示例)
* @Author: wr
* @Date: 2024/12/10 14:16
* 源设备Netty客户端通道处理器
* 负责处理程控源设备的Socket通信
*
* @author wr
* @since 2024/12/10
*/
@Slf4j
@RequiredArgsConstructor
public class NettySourceClientHandler extends SimpleChannelInboundHandler<String> {
/** 检测参数对象包含用户页面ID等信息 */
private final PreDetectionParam webUser;
/** 源设备响应处理服务 */
private final SocketSourceResponseService sourceResponseService;
/**
* 通道进行连接时推送消息
*
* @param ctx
* 通道激活回调将通道注册到SocketManager
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端通道已建立" + ctx.channel().id());
SocketManager.addUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG, ctx.channel());
}
/**
* 处理服务端消息信息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
System.out.println("source接收server端数据>>>>>>" + msg);
try {
sourceResponseService.deal(webUser, msg);
} catch (Exception e) {
e.printStackTrace();
CnSocketUtil.quitSend(webUser);
// 验证webUser参数有效性
if (webUser == null) {
log.warn("源设备客户端通道已建立但webUser为空, channelId: {}", ctx.channel().id());
return;
}
String userId = webUser.getUserPageId();
log.info("源设备客户端通道已建立, channelId: {}, userId: {}", ctx.channel().id(), userId);
// 将通道注册到Socket管理器便于后续消息推送
if (StrUtil.isNotBlank(userId)) {
SocketManager.addUser(userId + CnSocketUtil.SOURCE_TAG, ctx.channel());
} else {
log.warn("源设备userId为空或空白跳过通道注册, channelId: {}", ctx.channel().id());
}
}
/**
* 通道断线时,支持重连
*
* @param ctx
* 通道断开回调,清理资源
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("源通讯客户端断线");
String userId = webUser != null ? webUser.getUserPageId() : "unknown";
log.warn("源通讯客户端断线, channelId: {}, userId: {}", ctx.channel().id(), userId);
// 关闭通道连接
ctx.close();
SocketManager.removeUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG);
// System.out.println("断线了......" + ctx.channel());
// ctx.channel().eventLoop().schedule(() -> {
// System.out.println("断线重连......");
// //重连
// NettyClient.connect();
// }, 3L, TimeUnit.SECONDS);
}
/**
* 用户事件的回调方法(自定义事件用于心跳机制)
*
* @param ctx
* @param evt
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
//如果是空闲状态事件
if (evt instanceof IdleStateEvent) {
if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
//发送ping 保持心跳链接
/* SocketMsg<String> msg = new SocketMsg<>();
msg.setRequestId("yxt");
msg.setOperateCode(SourceOperateCodeEnum.HEARTBEAT.getValue());
msg.setData("");
ctx.writeAndFlush(JSON.toJSONString(msg)+"\n");*/
}
} else {
//防止堆栈溢出
//userEventTriggered(ctx, evt);
// 从Socket管理器中移除用户通道映射
if (webUser != null && StrUtil.isNotBlank(userId)) {
SocketManager.removeUser(userId + CnSocketUtil.SOURCE_TAG);
}
}
/**
* 处理源设备响应消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
// 验证用户参数
if (webUser == null) {
log.warn("源设备消息处理失败: webUser为空, message: {}", msg);
return;
}
String userId = webUser.getUserPageId();
log.debug("源设备接收服务端数据, userId: {}, message: {}", userId, msg);
try {
// 委托给专门的响应处理服务处理业务逻辑
sourceResponseService.deal(webUser, msg);
} catch (Exception e) {
log.error("源设备消息处理异常, userId: {}, message: {}", userId, msg, e);
// 发生异常时退出发送,避免后续问题
CnSocketUtil.quitSend(webUser);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("有通道准备接入" + ctx.channel().id());
// 记录处理器添加事件,用于调试
String userId = webUser != null ? webUser.getUserPageId() : "unknown";
log.debug("源设备通道准备接入, channelId: {}, userId: {}", ctx.channel().id(), userId);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 处理异常,例如记录日志、关闭连接等
System.out.println("捕获到源异常。。。。。。。");
cause.printStackTrace();
// 根据异常类型进行不同的处理
String userId = webUser != null ? webUser.getUserPageId() : "unknown";
String channelId = ctx.channel().id().toString();
// 根据异常类型进行分类处理和日志记录
if (cause instanceof ConnectException) {
// 处理连接异常,例如重试连接或记录特定的连接错误信息
System.out.println("连接socket服务端异常");
// 连接异常:网络连接失败
log.error("连接源设备Socket服务端异常, channelId: {}, userId: {}", channelId, userId, cause);
} else if (cause instanceof IOException) {
// 处理I/O异常例如读写错误
WebServiceManager.sendDetectionErrorMessage(webUser.getUserPageId(), SourceOperateCodeEnum.SERVER_ERROR);
// 例如可以记录更详细的I/O错误信息
// IO异常数据传输错误需通知前端
log.error("源设备IO异常, channelId: {}, userId: {}", channelId, userId, cause);
// 向前端发送服务器错误消息
if (StrUtil.isNotBlank(userId) && !"unknown".equals(userId)) {
WebServiceManager.sendDetectionErrorMessage(userId, SourceOperateCodeEnum.SERVER_ERROR);
}
} else if (cause instanceof TimeoutException) {
// 处理超时异常
System.out.println("TimeoutException caught: Operation timed out.");
// 可以根据业务逻辑决定是否重试或记录超时信息
// 超时异常:通信响应超时
log.warn("源设备通信超时, channelId: {}, userId: {}", channelId, userId, cause);
} else if (cause instanceof ProtocolException) {
// 处理协议异常,例如消息格式不正确
System.out.println("ProtocolException caught: Invalid protocol message.");
// 可以记录协议错误信息或向客户端发送错误响应
// 协议异常:数据格式不符合协议规范
log.error("源设备协议异常, channelId: {}, userId: {}", channelId, userId, cause);
} else {
// 处理其他类型的异常
System.out.println("Unknown exception caught: " + cause.getMessage());
// 可以记录未知异常信息
// 其他未知异常
log.error("源设备未知异常, channelId: {}, userId: {}, message: {}", channelId, userId, cause.getMessage(), cause);
}
// 发生异常时关闭通道
ctx.close();
}

View File

@@ -159,21 +159,6 @@ public class WebServiceManager {
preDetectionParamMap.put(userId, preDetectionParam);
}
/**
* 存储检测参数(兼容老版本)
* 从检测参数对象中获取userPageId作为key
*
* @param preDetectionParam 检测参数对象必须包含userPageId
* @throws IllegalArgumentException 当userPageId为空时抛出
* @deprecated 建议使用 {@link #addPreDetectionParam(String, PreDetectionParam)}
*/
@Deprecated
public static void addPreDetectionParam(PreDetectionParam preDetectionParam) {
if (preDetectionParam == null || preDetectionParam.getUserPageId() == null) {
throw new IllegalArgumentException("检测参数或用户ID不能为空");
}
preDetectionParamMap.put(preDetectionParam.getUserPageId(), preDetectionParam);
}
/**
* 获取指定用户的检测参数

View File

@@ -129,11 +129,6 @@ public class PqReportServiceImpl extends ServiceImpl<PqReportMapper, PqReport> i
@Value("${report.reportDir:D:\\report}")
private String reportPath;
@Value("${socket.device.ip}")
private String ip;
@Value("${socket.device.port}")
private Integer port;
@Value("${qr.cloud}")
private String cloudUrl;
@@ -184,7 +179,7 @@ public class PqReportServiceImpl extends ServiceImpl<PqReportMapper, PqReport> i
private final ISysTestConfigService sysTestConfigService;
private final SocketDevResponseService socketDevResponseService;
private final SocketManager socketManager;
@Autowired
private RestTemplateUtil restTemplateUtil;
@@ -705,8 +700,8 @@ public class PqReportServiceImpl extends ServiceImpl<PqReportMapper, PqReport> i
PreDetectionParam preDetectionParam = new PreDetectionParam();
preDetectionParam.setUserPageId(RequestUtil.getLoginName());
preDetectionParam.setSendWebMsg(false);
NettyClient.socketClient(ip, port, preDetectionParam, msg, new NettyDevClientHandler(preDetectionParam, socketDevResponseService));
// 使用智能发送工具类,自动管理设备连接
socketManager.smartSendToDevice(preDetectionParam, msg);
} else {
channel.writeAndFlush(msg + "\n");
}

View File

@@ -720,21 +720,17 @@ public class PqScriptDtlsServiceImpl extends ServiceImpl<PqScriptDtlsMapper, PqS
@Override
public List<String> getScriptToIcdCheckInfo(PreDetectionParam param) {
PqScriptIssueParam issueParam = new PqScriptIssueParam();
// issueParam.setPlanId(param.getPlanId());
issueParam.setSourceId(param.getSourceId());
issueParam.setDevIds(param.getDevIds());
issueParam.setScriptId(param.getScriptId());
issueParam.setIsPhaseSequence(CommonEnum.FORMAL_TEST.getValue());
List<SourceIssue> sourceIssues = this.listSourceIssue(issueParam);
Set<String> dataTypeSet = new HashSet<>();
sourceIssues.forEach(x -> {
dataTypeSet.addAll(x.getDevValueTypeList());
});
return dataTypeSet.stream().collect(Collectors.toList());
return new ArrayList<>(dataTypeSet);
}
@Override