From 46ec9923a7232c8a6ec8d0a7aab82985f98a8b80 Mon Sep 17 00:00:00 2001
From: hongawen <83944980@qq.com>
Date: Thu, 28 May 2026 20:28:37 +0800
Subject: [PATCH] =?UTF-8?q?docs(detection):=20=E5=88=A0=E9=99=A4CN=5FGathe?=
=?UTF-8?q?r=20Detection=E6=A8=A1=E5=9D=97Netty=E9=80=9A=E4=BF=A1=E6=9E=B6?=
=?UTF-8?q?=E6=9E=84=E5=88=86=E6=9E=90=E6=96=87=E6=A1=A3?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 移除了详细的Netty客户端和服务端组件说明文档
- 删除了WebSocket通信组件的技术细节描述
- 移除了Socket响应处理器和管理工具类的详细分析
- 清理了通信数据对象和流程分析相关内容
- 移除了智能Socket通信机制的技术文档
- 删除了配置管理组件和Spring集成的相关说明
---
CN_Gather_Detection_Netty架构详细分析文档.md | 1675 -----------------
.../controller/PreDetectionController.java | 184 +-
.../handler/SocketDevResponseService.java | 4 +
.../gather/detection/lock/DetectionLock.java | 29 +
.../detection/lock/DetectionLockManager.java | 134 ++
.../pojo/enums/DetectionResponseEnum.java | 3 +-
.../pojo/vo/DetectionLockHolderVO.java | 22 +
.../util/socket/cilent/NettyClient.java | 50 +-
.../cilent/NettyContrastClientHandler.java | 23 +
.../socket/cilent/NettyDevClientHandler.java | 25 +
.../cilent/NettySourceClientHandler.java | 16 +
.../socket/websocket/WebServiceManager.java | 22 +
.../socket/websocket/WebSocketHandler.java | 6 +
13 files changed, 499 insertions(+), 1694 deletions(-)
delete mode 100644 CN_Gather_Detection_Netty架构详细分析文档.md
create mode 100644 detection/src/main/java/com/njcn/gather/detection/lock/DetectionLock.java
create mode 100644 detection/src/main/java/com/njcn/gather/detection/lock/DetectionLockManager.java
create mode 100644 detection/src/main/java/com/njcn/gather/detection/pojo/vo/DetectionLockHolderVO.java
diff --git a/CN_Gather_Detection_Netty架构详细分析文档.md b/CN_Gather_Detection_Netty架构详细分析文档.md
deleted file mode 100644
index d7fceb1d..00000000
--- a/CN_Gather_Detection_Netty架构详细分析文档.md
+++ /dev/null
@@ -1,1675 +0,0 @@
-# CN_Gather Detection模块 Netty通信架构详细分析文档
-
-## 目录
-
-1. [架构概览](#1-架构概览)
-2. [智能Socket通信机制](#2-智能socket通信机制)
-3. [Netty客户端组件详解](#3-netty客户端组件详解)
-4. [Netty服务端组件详解](#4-netty服务端组件详解)
-5. [WebSocket通信组件详解](#5-websocket通信组件详解)
-6. [Socket响应处理器详解](#6-socket响应处理器详解)
-7. [Socket管理与工具类详解](#7-socket管理与工具类详解)
-8. [通信数据对象详解](#8-通信数据对象详解)
-9. [通信流程分析](#9-通信流程分析)
-10. [关键技术特性](#10-关键技术特性)
-11. [配置与部署](#11-配置与部署)
-
----
-
-## 1. 架构概览
-
-CN_Gather Detection模块采用**智能Socket + WebSocket**的混合通信架构,通过全新的智能发送机制和Spring组件化设计,支持电能质量设备检测的复杂业务场景。
-
-### 1.1 整体架构图
-
-```
-[前端页面] ←→ WebSocket(7777) ←→ [Detection应用] ←→ 智能Socket管理器 ←→ [源设备(62000)/被检设备(61000)]
- ↑ ↑
- Spring容器管理 自动连接建立
- ↑ ↑
- 配置统一管理 智能发送机制
-```
-
-### 1.2 核心组件层次结构
-
-```
-detection/
-├── util/socket/
-│ ├── cilent/ # Netty客户端组件
-│ │ ├── NettyClient.java # 智能客户端(Spring组件)
-│ │ ├── NettySourceClientHandler.java # 源设备处理器
-│ │ ├── NettyDevClientHandler.java # 被检设备处理器
-│ │ ├── NettyContrastClientHandler.java # 比对设备处理器
-│ │ └── HeartbeatHandler.java # 心跳处理器
-│ ├── service/ # Netty服务端组件
-│ │ ├── NettyServer.java # 服务器核心
-│ │ ├── DevNettyServerHandler.java # 设备服务端处理器
-│ │ └── SourceNettyServerHandler.java # 源服务端处理器
-│ ├── websocket/ # WebSocket通信组件
-│ │ ├── WebSocketService.java # WebSocket服务
-│ │ ├── WebSocketHandler.java # 消息处理器
-│ │ ├── WebSocketInitializer.java # 初始化器
-│ │ └── WebServiceManager.java # 会话管理器
-│ ├── config/ # 配置管理组件(新增)
-│ │ └── SocketConnectionConfig.java # Socket连接配置
-│ ├── SocketManager.java # 智能Socket管理器(Spring组件)
-│ ├── CnSocketUtil.java # Socket工具类
-│ ├── FormalTestManager.java # 检测管理器
-│ └── XiNumberManager.java # 系数管理器
-└── handler/ # 业务响应处理器
- ├── SocketSourceResponseService.java # 源响应处理
- ├── SocketDevResponseService.java # 设备响应处理
- └── SocketContrastResponseService.java # 比对响应处理
-```
-
-### 1.3 核心架构改进
-
-#### 1.3.1 智能发送机制
-- **自动连接管理**: 根据requestId自动判断是否需要建立连接
-- **透明化操作**: 开发者只需关心业务逻辑,连接管理完全透明
-- **配置驱动**: 通过配置文件统一管理需要建立连接的requestId
-
-#### 1.3.2 Spring组件化
-- **全面Spring管理**: NettyClient和SocketManager完全交给Spring容器管理
-- **依赖注入**: 通过构造函数注入实现松耦合设计
-- **生命周期管理**: 利用Spring的@PostConstruct和@PreDestroy管理组件生命周期
-
-#### 1.3.3 配置统一管理
-- **集中配置**: 所有Socket相关配置统一在application.yml中管理
-- **环境隔离**: 支持不同环境使用不同的IP和端口配置
-- **配置热更新**: 支持配置的动态刷新
-
----
-
-## 2. 智能Socket通信机制
-
-### 2.1 智能发送机制核心设计
-
-**设计理念:**
-- **开发者友好**: 开发者只需调用发送方法,无需关心连接管理
-- **自动化管理**: 系统自动判断是否需要建立连接
-- **配置驱动**: 通过配置决定哪些requestId需要建立连接
-
-**核心组件关系图:**
-
-```mermaid
-graph TB
- A[业务层] --> B[SocketManager]
- B --> C[SocketConnectionConfig]
- B --> D[NettyClient]
- C --> E[application.yml]
- D --> F[NettySourceClientHandler]
- D --> G[NettyDevClientHandler]
-
- subgraph "Spring容器管理"
- B
- C
- D
- end
-
- subgraph "配置管理"
- E
- H[requestId配置]
- I[IP/PORT配置]
- end
-```
-
-### 2.2 SocketConnectionConfig.java - 智能配置管理器
-
-**功能职责:**
-- 管理需要建立连接的requestId配置
-- 统一管理Socket的IP和PORT配置
-- 提供配置的动态读取和验证
-
-**关键代码分析:**
-
-```java
-@Component
-@ConfigurationProperties(prefix = "socket")
-public class SocketConnectionConfig {
-
- /**
- * 程控源设备配置
- */
- private SourceConfig source = new SourceConfig();
-
- /**
- * 被检设备配置
- */
- private DeviceConfig device = new DeviceConfig();
-
- @Data
- public static class SourceConfig {
- private String ip = "127.0.0.1";
- private Integer port = 62000;
- }
-
- @Data
- public static class DeviceConfig {
- private String ip = "127.0.0.1";
- private Integer port = 61000;
- }
-
- /**
- * 需要建立程控源通道的requestId集合
- */
- private static final Set SOURCE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
- "yjc_ytxjy" // 源通讯检测
- ));
-
- /**
- * 需要建立被检设备通道的requestId集合
- */
- private static final Set DEVICE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
- "yjc_sbtxjy", // 连接建立
- "FTP_SEND$01" // ftp文件传送指令
- ));
-
- /**
- * 检查指定的requestId是否需要建立程控源连接
- */
- public static boolean needsSourceConnection(String requestId) {
- return SOURCE_CONNECTION_REQUEST_IDS.contains(requestId);
- }
-
- /**
- * 检查指定的requestId是否需要建立被检设备连接
- */
- public static boolean needsDeviceConnection(String requestId) {
- return DEVICE_CONNECTION_REQUEST_IDS.contains(requestId);
- }
-}
-```
-
-### 2.3 SocketManager.java - 智能Socket管理器
-
-**功能职责:**
-- 提供智能发送API,自动管理连接建立
-- 统一管理Socket会话和EventLoopGroup
-- 支持多种发送模式和连接状态检查
-
-**关键代码分析:**
-
-```java
-@Slf4j
-@Component
-public class SocketManager {
-
- @Autowired
- private SocketConnectionConfig socketConnectionConfig;
-
- /**
- * key为userId(xxx_Source、xxx_Dev),value为channel
- */
- private static final Map socketSessions = new ConcurrentHashMap<>();
-
- /**
- * key为userId(xxx_Source、xxx_Dev),value为group
- */
- private static final Map socketGroup = new ConcurrentHashMap<>();
-
- /**
- * 智能发送消息到程控源设备
- * 自动从配置文件读取IP和PORT,开发者无需关心网络配置
- * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送
- */
- public void smartSendToSource(PreDetectionParam param, String msg) {
- String ip = socketConnectionConfig.getSource().getIp();
- Integer port = socketConnectionConfig.getSource().getPort();
- String requestId = extractRequestId(msg);
- String userId = param.getUserPageId() + CnSocketUtil.SOURCE_TAG;
-
- // 检查是否需要建立连接
- if (SocketConnectionConfig.needsSourceConnection(requestId)) {
- // 检查连接是否存在且活跃
- if (!isChannelActive(userId)) {
- log.info("程控源连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
- // 异步建立程控源连接并发送消息
- CompletableFuture.runAsync(() -> {
- NettyClient.connectToSourceStatic(ip, port, param, msg);
- });
- return;
- }
- }
-
- // 连接已存在或不需要建立连接,直接发送消息
- log.info("直接发送消息到程控源: userId={}, requestId={}", userId, requestId);
- sendMsg(userId, msg);
- }
-
- /**
- * 智能发送消息到被检设备
- * 自动从配置文件读取IP和PORT,开发者无需关心网络配置
- * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送
- */
- public void smartSendToDevice(PreDetectionParam param, String msg) {
- String requestId = extractRequestId(msg);
- String userId = param.getUserPageId() + CnSocketUtil.DEV_TAG;
-
- // 检查是否需要建立连接
- if (SocketConnectionConfig.needsDeviceConnection(requestId)) {
- String ip = socketConnectionConfig.getDevice().getIp();
- Integer port = socketConnectionConfig.getDevice().getPort();
- // 检查连接是否存在且活跃
- if (!isChannelActive(userId)) {
- log.info("被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
- // 异步建立被检设备连接并发送消息
- CompletableFuture.runAsync(() -> {
- NettyClient.connectToDeviceStatic(ip, port, param, msg);
- });
- return;
- }
- }
-
- // 连接已存在或不需要建立连接,直接发送消息
- log.info("直接发送消息到被检设备: userId={}, requestId={}", userId, requestId);
- sendMsg(userId, msg);
- }
-
- /**
- * 从消息中提取requestId
- * 支持JSON格式的消息解析
- */
- private static String extractRequestId(String msg) {
- try {
- if (StrUtil.isNotBlank(msg)) {
- // 尝试解析JSON格式消息
- JSONObject jsonObject = JSON.parseObject(msg);
- String requestId = jsonObject.getString("requestId");
- if (StrUtil.isNotBlank(requestId)) {
- return requestId;
- }
-
- // 如果没有requestId字段,尝试解析request_id字段
- requestId = jsonObject.getString("request_id");
- if (StrUtil.isNotBlank(requestId)) {
- return requestId;
- }
- }
- } catch (Exception e) {
- log.warn("解析消息中的requestId失败: msg={}, error={}", msg, e.getMessage());
- }
-
- return "unknown";
- }
-
- /**
- * 检查指定用户的Channel是否活跃
- */
- private static boolean isChannelActive(String userId) {
- Channel channel = getChannelByUserId(userId);
- return ObjectUtil.isNotNull(channel) && channel.isActive();
- }
-}
-```
-
-### 2.4 使用示例
-
-#### 2.4.1 业务层调用方式
-
-```java
-@Service
-@RequiredArgsConstructor
-public class PreDetectionServiceImpl implements PreDetectionService {
-
- private final SocketManager socketManager;
-
- @Override
- public void sourceCommunicationCheck(PreDetectionParam param) {
- // 组装检测消息
- SocketMsg msg = new SocketMsg<>();
- msg.setRequestId("yjc_ytxjy");
- msg.setOperateCode("INIT_GATHER");
- msg.setData(JSON.toJSONString(sourceParam));
-
- // 智能发送 - 系统自动判断是否需要建立连接
- socketManager.smartSendToSource(param, JSON.toJSONString(msg));
- }
-}
-```
-
-#### 2.4.2 配置文件示例
-
-```yaml
-# application.yml
-socket:
- source:
- ip: 192.168.1.124
- port: 62000
- device:
- ip: 192.168.1.124
- port: 61000
-```
-
----
-
-## 3. Netty客户端组件详解
-
-### 3.1 NettyClient.java - Spring管理的智能客户端
-
-**功能职责:**
-- 作为Spring组件提供连接服务
-- 支持源设备和被检设备的智能连接
-- 自动处理Handler的实例化和依赖注入
-
-**关键代码分析:**
-
-```java
-@Component
-@Slf4j
-public class NettyClient {
-
- @Autowired
- private SocketSourceResponseService socketSourceResponseService;
-
- @Autowired
- private SocketDevResponseService socketDevResponseService;
-
- private static NettyClient instance;
-
- @PostConstruct
- public void init() {
- instance = this;
- }
-
- /**
- * 连接到程控源设备
- * Spring管理的实例方法,支持依赖注入
- */
- public void connectToSource(String ip, Integer port, PreDetectionParam param, String msg) {
- NettySourceClientHandler handler = createSourceHandler(param);
- executeSocketConnection(ip, port, param, msg, handler);
- }
-
- /**
- * 连接到被检设备
- * Spring管理的实例方法,支持依赖注入
- */
- public void connectToDevice(String ip, Integer port, PreDetectionParam param, String msg) {
- NettyDevClientHandler handler = createDeviceHandler(param);
- executeSocketConnection(ip, port, param, msg, handler);
- }
-
- /**
- * 静态方法入口 - 保持向后兼容
- */
- public static void connectToSourceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
- if (instance != null) {
- instance.connectToSource(ip, port, param, msg);
- } else {
- log.error("NettyClient未初始化,无法创建程控源连接");
- }
- }
-
- /**
- * 静态方法入口 - 保持向后兼容
- */
- public static void connectToDeviceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
- if (instance != null) {
- instance.connectToDevice(ip, port, param, msg);
- } else {
- log.error("NettyClient未初始化,无法创建被检设备连接");
- }
- }
-
- /**
- * 创建源设备处理器
- * 利用Spring注入的Service实例
- */
- private NettySourceClientHandler createSourceHandler(PreDetectionParam param) {
- return new NettySourceClientHandler(param, socketSourceResponseService);
- }
-
- /**
- * 创建被检设备处理器
- * 利用Spring注入的Service实例
- */
- private NettyDevClientHandler createDeviceHandler(PreDetectionParam param) {
- return new NettyDevClientHandler(param, socketDevResponseService);
- }
-
- /**
- * 执行Socket连接建立流程(重构后的核心实现)
- */
- private static void executeSocketConnection(String ip, Integer port,
- PreDetectionParam param, String msg, SimpleChannelInboundHandler handler) {
- // 创建NIO事件循环组
- NioEventLoopGroup group = createEventLoopGroup();
-
- try {
- // 配置客户端启动器
- Bootstrap bootstrap = configureBootstrap(group);
- // 创建管道初始化器
- ChannelInitializer initializer = createChannelInitializer(param, handler);
- bootstrap.handler(initializer);
- // 同步连接到目标服务器
- ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
- // 处理连接结果
- handleConnectionResult(channelFuture, param, handler, group, msg);
- } catch (Exception e) {
- // 处理连接异常
- handleConnectionException(e, param, handler, group);
- }
- }
-
- /**
- * 创建NIO事件循环组
- */
- private static NioEventLoopGroup createEventLoopGroup() {
- return new NioEventLoopGroup();
- }
-
- /**
- * 配置Bootstrap启动器
- */
- private static Bootstrap configureBootstrap(NioEventLoopGroup group) {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .channel(NioSocketChannel.class);
- return bootstrap;
- }
-
- /**
- * 创建通道初始化器
- * 根据处理器类型配置不同的Pipeline
- */
- private static ChannelInitializer createChannelInitializer(
- PreDetectionParam param, SimpleChannelInboundHandler handler) {
- return new ChannelInitializer() {
- @Override
- protected void initChannel(NioSocketChannel ch) {
- if (handler instanceof NettySourceClientHandler) {
- configureSourcePipeline(ch, param, handler);
- } else {
- configureDevicePipeline(ch, param, handler);
- }
- }
- };
- }
-
- /**
- * 配置程控源设备的Pipeline
- */
- private static void configureSourcePipeline(NioSocketChannel ch, PreDetectionParam param,
- SimpleChannelInboundHandler handler) {
- ch.pipeline()
- .addLast("frame-decoder", new LineBasedFrameDecoder(10240))
- .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8))
- .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8))
- .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG))
- .addLast("source-handler", handler);
- }
-
- /**
- * 配置被检设备的Pipeline
- */
- private static void configureDevicePipeline(NioSocketChannel ch, PreDetectionParam param,
- SimpleChannelInboundHandler handler) {
- ch.pipeline()
- .addLast("frame-decoder", new LineBasedFrameDecoder(10240))
- .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8))
- .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8))
- .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.DEV_TAG))
- .addLast("idle-detector", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
- .addLast("device-handler", handler);
- }
-
- /**
- * 处理连接建立结果
- */
- private static void handleConnectionResult(ChannelFuture channelFuture, PreDetectionParam param,
- SimpleChannelInboundHandler handler, NioEventLoopGroup group, String msg) {
- if (channelFuture.isSuccess()) {
- log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress());
-
- // 注册会话和EventLoopGroup到管理器
- String userId = getConnectionUserId(param, handler);
- SocketManager.addUser(userId, channelFuture.channel());
- SocketManager.addGroup(userId, group);
-
- // 发送初始消息
- if (StrUtil.isNotBlank(msg)) {
- channelFuture.channel().writeAndFlush(msg + "\n");
- log.info("发送初始消息: {}", msg);
- }
- } else {
- log.error("Socket连接建立失败");
- handleConnectionFailure(param, group);
- }
- }
-
- /**
- * 获取连接的用户ID
- */
- private static String getConnectionUserId(PreDetectionParam param, SimpleChannelInboundHandler handler) {
- String tag = (handler instanceof NettySourceClientHandler) ? CnSocketUtil.SOURCE_TAG : CnSocketUtil.DEV_TAG;
- return param.getUserPageId() + tag;
- }
-
- /**
- * 处理连接异常
- */
- private static void handleConnectionException(Exception e, PreDetectionParam param,
- SimpleChannelInboundHandler handler, NioEventLoopGroup group) {
- log.error("Socket连接过程中发生异常: {}", e.getMessage(), e);
-
- // 清理资源
- if (group != null) {
- group.shutdownGracefully();
- }
-
- // 发送错误消息到前端
- try {
- CnSocketUtil.quitSendSource(param);
- CnSocketUtil.quitSend(param);
- } catch (Exception ex) {
- log.error("发送错误消息失败", ex);
- }
- }
-
- /**
- * 处理连接失败情况
- */
- private static void handleConnectionFailure(PreDetectionParam param, NioEventLoopGroup group) {
- // 清理EventLoopGroup资源
- if (group != null) {
- group.shutdownGracefully();
- }
-
- // 通知业务层连接失败
- try {
- CnSocketUtil.quitSendSource(param);
- CnSocketUtil.quitSend(param);
- } catch (Exception e) {
- log.error("处理连接失败通知时发生异常", e);
- }
- }
-}
-```
-
-### 3.2 Handler组件改进
-
-#### 3.2.1 NettySourceClientHandler.java - 源设备处理器
-
-**功能改进:**
-- 简化异常处理逻辑,移除冗余注释
-- 使用slf4j日志替代System.out.println
-- 优化连接状态管理
-
-```java
-@RequiredArgsConstructor
-@Slf4j
-public class NettySourceClientHandler extends SimpleChannelInboundHandler {
- private final PreDetectionParam webUser;
- private final SocketSourceResponseService sourceResponseService;
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- log.info("程控源客户端通道已建立: {}", ctx.channel().id());
- SocketManager.addUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG, ctx.channel());
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
- log.debug("接收源设备数据: {}", msg);
- try {
- sourceResponseService.deal(webUser, msg);
- } catch (Exception e) {
- log.error("处理源设备响应异常", e);
- CnSocketUtil.quitSend(webUser);
- }
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- log.info("程控源客户端连接断开");
- ctx.close();
- SocketManager.removeUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG);
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
- log.debug("程控源设备空闲状态触发");
- }
- } else {
- super.userEventTriggered(ctx, evt);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- log.error("程控源通信异常", cause);
- if (cause instanceof ConnectException) {
- log.warn("程控源连接异常");
- } else if (cause instanceof IOException) {
- WebServiceManager.sendDetectionErrorMessage(webUser.getUserPageId(), SourceOperateCodeEnum.SERVER_ERROR);
- } else if (cause instanceof TimeoutException) {
- log.warn("程控源通信超时");
- }
- ctx.close();
- }
-}
-```
-
-#### 3.2.2 NettyDevClientHandler.java - 被检设备处理器
-
-**功能改进:**
-- 精简超时检测逻辑,移除大段注释代码
-- 优化异常处理机制
-- 统一日志输出格式
-
-```java
-@RequiredArgsConstructor
-@Slf4j
-public class NettyDevClientHandler extends SimpleChannelInboundHandler {
- private final PreDetectionParam param;
- private final SocketDevResponseService socketDevResponseService;
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- log.info("被检设备客户端通道已建立: {}", ctx.channel().id());
- SocketManager.addUser(param.getUserPageId() + CnSocketUtil.DEV_TAG, ctx.channel());
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- log.debug("接收被检设备数据: {}", msg);
- try {
- socketDevResponseService.deal(param, msg);
- } catch (Exception e) {
- log.error("处理被检设备响应异常", e);
- CnSocketUtil.quitSend(param);
- }
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- log.info("被检设备客户端连接断开");
- ctx.close();
- SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG);
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- log.warn("被检设备读超时触发");
- handleReadTimeout(ctx);
- }
- } else {
- super.userEventTriggered(ctx, evt);
- }
- }
-
- /**
- * 处理读超时事件
- */
- private void handleReadTimeout(ChannelHandlerContext ctx) {
- if (!FormalTestManager.hasStopFlag) {
- if (CollUtil.isNotEmpty(SocketManager.getSourceList())) {
- SourceIssue sourceIssue = SocketManager.getSourceList().get(0);
- // 更新超时计时器
- updateTimeoutCounter(sourceIssue);
- // 检查是否需要触发超时处理
- if (shouldTriggerTimeout(sourceIssue)) {
- log.warn("检测项超时: {}", sourceIssue.getType());
- CnSocketUtil.quitSend(param);
- timeoutSend(sourceIssue);
- }
- }
- }
- }
-
- /**
- * 更新超时计时器
- */
- private void updateTimeoutCounter(SourceIssue sourceIssue) {
- SocketManager.clockMap.put(sourceIssue.getIndex(),
- SocketManager.clockMap.getOrDefault(sourceIssue.getIndex(), 0L) + 60L);
- }
-
- /**
- * 判断是否应该触发超时处理
- */
- private boolean shouldTriggerTimeout(SourceIssue sourceIssue) {
- Long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex());
- if (currentTime == null) return false;
-
- // 根据检测类型设置不同的超时时间
- if (sourceIssue.getType().equals(DicDataEnum.F.getCode())) {
- return currentTime > 1300; // 闪变: 20分钟超时
- } else if (sourceIssue.getType().equals(DicDataEnum.VOLTAGE.getCode()) ||
- sourceIssue.getType().equals(DicDataEnum.HP.getCode())) {
- return currentTime > 180; // 统计数据: 3分钟超时
- } else {
- return currentTime > 60; // 实时数据: 1分钟超时
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- log.error("被检设备通信异常", cause);
- if (cause instanceof ConnectException) {
- log.warn("被检设备连接异常");
- } else if (cause instanceof IOException) {
- WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR);
- } else if (cause instanceof TimeoutException) {
- log.warn("被检设备通信超时");
- }
-
- // 清理资源并断开连接
- CnSocketUtil.quitSend(param);
- CnSocketUtil.quitSendSource(param);
- ctx.close();
- }
-}
-```
-
----
-
-## 4. Netty服务端组件详解
-
-### 4.1 NettyServer.java - 服务器核心
-
-**功能职责:**
-- 提供Socket服务端功能,用于测试和开发
-- 支持源通信服务和设备通信服务
-- 模拟外部设备的响应行为
-
-**关键代码分析:**
-
-```java
-public class NettyServer {
- public static final int port = 8574;
-
- private void runSource() {
- NioEventLoopGroup boss = new NioEventLoopGroup(1);
- NioEventLoopGroup work = new NioEventLoopGroup();
- try {
- ServerBootstrap bootstrap = new ServerBootstrap().group(boss, work);
- bootstrap.channel(NioServerSocketChannel.class)
- .handler(new ChannelInitializer() {
- @Override
- protected void initChannel(ServerSocketChannel ch) {
- System.out.println("源通讯服务正在启动中......");
- }
- })
- .childHandler(new ChannelInitializer() {
- @Override
- protected void initChannel(NioSocketChannel ch) {
- ch.pipeline()
- .addLast(new LineBasedFrameDecoder(10240))
- .addLast(new StringDecoder(CharsetUtil.UTF_8))
- .addLast(new StringEncoder(CharsetUtil.UTF_8))
- .addLast(new DevNettyServerHandler());
- }
- });
-
- ChannelFuture future = bootstrap.bind(port).sync();
- future.addListener(f -> {
- if (future.isSuccess()) {
- System.out.println("源通讯服务启动成功");
- } else {
- System.out.println("源通讯服务启动失败");
- }
- });
- future.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- boss.shutdownGracefully();
- work.shutdownGracefully();
- }
- }
-}
-```
-
----
-
-## 5. WebSocket通信组件详解
-
-### 5.1 WebSocketService.java - WebSocket服务核心
-
-**功能职责:**
-- 启动基于Netty的WebSocket服务器
-- 管理服务器生命周期(启动/关闭)
-- 提供高性能的WebSocket通信支持
-
-**关键代码分析:**
-
-```java
-@Component
-@RequiredArgsConstructor
-@Slf4j
-public class WebSocketService implements ApplicationRunner {
-
- @Value("${webSocket.port:7777}")
- int port;
-
- EventLoopGroup bossGroup;
- EventLoopGroup workerGroup;
- private Channel serverChannel;
- private CompletableFuture serverFuture;
-
- @Override
- public void run(ApplicationArguments args) {
- // 使用CompletableFuture异步启动WebSocket服务,避免阻塞Spring Boot主线程
- serverFuture = CompletableFuture.runAsync(this::startWebSocketServer)
- .exceptionally(throwable -> {
- log.error("WebSocket服务启动异常", throwable);
- return null;
- });
- }
-
- private void startWebSocketServer() {
- try {
- bossGroup = new NioEventLoopGroup(1);
- workerGroup = new NioEventLoopGroup();
-
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler())
- .option(ChannelOption.SO_BACKLOG, 128)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childHandler(new WebSocketInitializer());
-
- ChannelFuture future = serverBootstrap.bind(port).sync();
- serverChannel = future.channel();
-
- future.addListener(f -> {
- if (future.isSuccess()) {
- log.info("webSocket服务启动成功,端口:{}", port);
- } else {
- log.error("webSocket服务启动失败,端口:{}", port);
- }
- });
-
- future.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- log.error("WebSocket服务启动过程中被中断", e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- log.error("WebSocket服务启动失败", e);
- throw new RuntimeException("WebSocket服务启动失败", e);
- } finally {
- shutdownGracefully();
- }
- }
-
- @PreDestroy
- public void destroy() throws InterruptedException {
- log.info("正在关闭WebSocket服务...");
-
- if (serverChannel != null) {
- try {
- serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- log.warn("关闭服务器通道时发生异常", e);
- }
- }
-
- if (bossGroup != null) {
- bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
- }
- if (workerGroup != null) {
- workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
- }
-
- if (serverFuture != null && !serverFuture.isDone()) {
- boolean cancelled = serverFuture.cancel(true);
- }
-
- log.info("webSocket服务已销毁");
- }
-}
-```
-
----
-
-## 6. Socket响应处理器详解
-
-### 6.1 响应处理器改进
-
-**主要改进:**
-- 支持SocketManager的依赖注入
-- 移除硬编码的IP/PORT配置
-- 使用智能发送机制简化代码
-
-#### 6.1.1 SocketSourceResponseService.java
-
-```java
-@Service
-@RequiredArgsConstructor
-public class SocketSourceResponseService {
-
- private final SocketDevResponseService socketDevResponseService;
- private final IPqDevService iPqDevService;
- private final SocketManager socketManager;
-
- public void deal(PreDetectionParam param, String msg) throws Exception {
- SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
- String[] tem = socketDataMsg.getRequestId().split(CnSocketUtil.STEP_TAG);
- SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(tem[0]);
-
- if (ObjectUtil.isNotNull(enumByCode)) {
- switch (enumByCode) {
- case YJC_YTXJY:
- if (ObjectUtil.isNotNull(param.getPlanId())) {
- detectionDev(param, socketDataMsg);
- } else {
- handleYtxjySimulate(param, socketDataMsg);
- }
- break;
- case YJC_XUJY:
- phaseSequenceDev(param, socketDataMsg);
- break;
- case FORMAL_REAL:
- if (ObjectUtil.isNotNull(param.getPlanId())) {
- senParamToDev(param, socketDataMsg);
- } else {
- handleSimulateTest(param, socketDataMsg);
- }
- break;
- case Coefficient_Check:
- coefficient(param, socketDataMsg);
- break;
- }
- }
- }
-
- // 装置检测 - 使用智能发送机制
- private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) {
- SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
- if (ObjectUtil.isNotNull(dictDataEnumByCode)) {
- SocketMsg socketMsg = new SocketMsg<>();
- switch (dictDataEnumByCode) {
- case SUCCESS:
- WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
-
- Map> map = new HashMap<>(1);
- map.put("deviceList", FormalTestManager.devList);
- String jsonString = JSON.toJSONString(map);
- socketMsg.setRequestId(SourceOperateCodeEnum.YJC_SBTXJY.getValue());
- socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue());
- socketMsg.setData(jsonString);
- String json = JSON.toJSONString(socketMsg);
-
- // 使用智能发送工具类,自动管理设备连接
- socketManager.smartSendToDevice(param, json);
- break;
- case UNPROCESSED_BUSINESS:
- WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
- break;
- default:
- CnSocketUtil.quitSendSource(param);
- WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
- break;
- }
- }
- }
-}
-```
-
----
-
-## 7. Socket管理与工具类详解
-
-### 7.1 SocketManager.java - 智能Socket管理器
-
-**核心功能:**
-1. **智能发送机制**: 自动判断连接需求,透明管理连接建立
-2. **Spring组件管理**: 完全交给Spring容器管理,支持依赖注入
-3. **会话管理**: 统一管理Socket连接会话和EventLoopGroup
-4. **检测任务管理**: 管理检测相关的状态信息和配置
-
-**关键数据结构:**
-
-```java
-@Component
-@Slf4j
-public class SocketManager {
-
- @Autowired
- private SocketConnectionConfig socketConnectionConfig;
-
- // Socket会话管理
- private static final Map socketSessions = new ConcurrentHashMap<>();
- private static final Map socketGroup = new ConcurrentHashMap<>();
-
- // 检测任务管理
- private static Map targetMap = new ConcurrentHashMap<>();
- private static List sourceIssueList = new CopyOnWriteArrayList<>();
- public static Map valueTypeMap = new HashMap<>();
- public static volatile Map clockMap = new ConcurrentHashMap<>();
- public static volatile Map contrastClockMap = new ConcurrentHashMap<>();
-
- // 基础连接管理方法
- public static void addUser(String userId, Channel channel) {
- socketSessions.put(userId, channel);
- }
-
- public static void addGroup(String userId, NioEventLoopGroup group) {
- socketGroup.put(userId, group);
- }
-
- public static void removeUser(String userId) {
- Channel channel = socketSessions.get(userId);
- if (ObjectUtil.isNotNull(channel)) {
- try {
- channel.close().sync();
- } catch (InterruptedException e) {
- log.error("关闭通道异常", e);
- }
- NioEventLoopGroup eventExecutors = socketGroup.get(userId);
- if (ObjectUtil.isNotNull(eventExecutors)) {
- eventExecutors.shutdownGracefully();
- log.info("{}__{}关闭了客户端", userId, channel.id());
- }
- }
- socketSessions.remove(userId);
- }
-
- public static void sendMsg(String userId, String msg) {
- Channel channel = socketSessions.get(userId);
- if (ObjectUtil.isNotNull(channel)) {
- channel.writeAndFlush(msg + '\n');
- log.info("{}__{}往{}发送数据:{}", userId, channel.id(), channel.remoteAddress(), msg);
- } else {
- log.warn("{}__发送数据:失败通道不存在{}", userId, msg);
- }
- }
-
- // 检测任务管理方法
- public static void addSourceList(List sList) {
- sourceIssueList = sList;
- }
-
- public static List getSourceList() {
- return sourceIssueList;
- }
-
- public static void delSource(Integer index) {
- sourceIssueList.removeIf(s -> index.equals(s.getIndex()));
- }
-
- public static void delSourceTarget(String sourceTag) {
- targetMap.remove(sourceTag);
- }
-
- public static void initMap(Map map) {
- targetMap = map;
- }
-
- public static void addTargetMap(String scriptType, Long count) {
- targetMap.put(scriptType, count);
- }
-
- public static Long getSourceTarget(String scriptType) {
- return targetMap.get(scriptType);
- }
-}
-```
-
-### 7.2 CnSocketUtil.java - Socket工具类
-
-**功能职责:**
-- 提供Socket连接的控制功能
-- 封装WebSocket消息推送
-- 定义通信相关常量
-
-**关键代码:**
-
-```java
-public class CnSocketUtil {
-
- public final static String DEV_TAG = "_Dev";
- public final static String SOURCE_TAG = "_Source";
- public final static String START_TAG = "_Start";
- public final static String END_TAG = "_End";
- public final static String STEP_TAG = "&&";
- public final static String SPLIT_TAG = "_";
-
- // 退出检测
- public static void quitSend(PreDetectionParam param) {
- SocketMsg socketMsg = new SocketMsg<>();
- socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue());
- socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue());
- SocketManager.sendMsg(param.getUserPageId() + DEV_TAG, JSON.toJSONString(socketMsg));
- WebServiceManager.removePreDetectionParam();
- }
-
- // 关闭源连接
- public static void quitSendSource(PreDetectionParam param) {
- SocketMsg socketMsg = new SocketMsg<>();
- socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue());
- socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue());
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("sourceId", param.getSourceId());
- socketMsg.setData(jsonObject.toJSONString());
- SocketManager.sendMsg(param.getUserPageId() + SOURCE_TAG, JSON.toJSONString(socketMsg));
- WebServiceManager.removePreDetectionParam();
- }
-
- // 推送webSocket数据
- public static void sendToWebSocket(String userId, String requestId, String operatorType,
- Object data, String desc) {
- WebSocketVO
+ *
+ * 注:业务"正常完成"路径不走此方法(数模式 formalDeal 已在 Phase 1 显式释放;
+ * 比对模式正常完成走 sendMsg 推 ERROR_FLOW_END,依赖 WS 断开后心跳超时兜底)。
+ */
+ private static void releaseLockOnTerminal(String userId, SourceOperateCodeEnum errorType) {
+ if (userId == null || userId.isEmpty()) {
+ return;
+ }
+ DetectionLockManager.getInstance()
+ .releaseIfMatchPage(userId, "WS_ERROR_PUSH:" + errorType.name());
}
}
\ No newline at end of file
diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java
index a7ed7dcb..ff472c39 100644
--- a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java
+++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java
@@ -1,6 +1,7 @@
package com.njcn.gather.detection.util.socket.websocket;
import cn.hutool.core.util.ObjectUtil;
+import com.njcn.gather.detection.lock.DetectionLockManager;
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
@@ -414,6 +415,11 @@ public class WebSocketHandler extends SimpleChannelInboundHandler