diff --git a/CN_Gather_Detection_Netty架构详细分析文档.md b/CN_Gather_Detection_Netty架构详细分析文档.md deleted file mode 100644 index d7fceb1d..00000000 --- a/CN_Gather_Detection_Netty架构详细分析文档.md +++ /dev/null @@ -1,1675 +0,0 @@ -# CN_Gather Detection模块 Netty通信架构详细分析文档 - -## 目录 - -1. [架构概览](#1-架构概览) -2. [智能Socket通信机制](#2-智能socket通信机制) -3. [Netty客户端组件详解](#3-netty客户端组件详解) -4. [Netty服务端组件详解](#4-netty服务端组件详解) -5. [WebSocket通信组件详解](#5-websocket通信组件详解) -6. [Socket响应处理器详解](#6-socket响应处理器详解) -7. [Socket管理与工具类详解](#7-socket管理与工具类详解) -8. [通信数据对象详解](#8-通信数据对象详解) -9. [通信流程分析](#9-通信流程分析) -10. [关键技术特性](#10-关键技术特性) -11. [配置与部署](#11-配置与部署) - ---- - -## 1. 架构概览 - -CN_Gather Detection模块采用**智能Socket + WebSocket**的混合通信架构,通过全新的智能发送机制和Spring组件化设计,支持电能质量设备检测的复杂业务场景。 - -### 1.1 整体架构图 - -``` -[前端页面] ←→ WebSocket(7777) ←→ [Detection应用] ←→ 智能Socket管理器 ←→ [源设备(62000)/被检设备(61000)] - ↑ ↑ - Spring容器管理 自动连接建立 - ↑ ↑ - 配置统一管理 智能发送机制 -``` - -### 1.2 核心组件层次结构 - -``` -detection/ -├── util/socket/ -│ ├── cilent/ # Netty客户端组件 -│ │ ├── NettyClient.java # 智能客户端(Spring组件) -│ │ ├── NettySourceClientHandler.java # 源设备处理器 -│ │ ├── NettyDevClientHandler.java # 被检设备处理器 -│ │ ├── NettyContrastClientHandler.java # 比对设备处理器 -│ │ └── HeartbeatHandler.java # 心跳处理器 -│ ├── service/ # Netty服务端组件 -│ │ ├── NettyServer.java # 服务器核心 -│ │ ├── DevNettyServerHandler.java # 设备服务端处理器 -│ │ └── SourceNettyServerHandler.java # 源服务端处理器 -│ ├── websocket/ # WebSocket通信组件 -│ │ ├── WebSocketService.java # WebSocket服务 -│ │ ├── WebSocketHandler.java # 消息处理器 -│ │ ├── WebSocketInitializer.java # 初始化器 -│ │ └── WebServiceManager.java # 会话管理器 -│ ├── config/ # 配置管理组件(新增) -│ │ └── SocketConnectionConfig.java # Socket连接配置 -│ ├── SocketManager.java # 智能Socket管理器(Spring组件) -│ ├── CnSocketUtil.java # Socket工具类 -│ ├── FormalTestManager.java # 检测管理器 -│ └── XiNumberManager.java # 系数管理器 -└── handler/ # 业务响应处理器 - ├── SocketSourceResponseService.java # 源响应处理 - ├── SocketDevResponseService.java # 设备响应处理 - └── SocketContrastResponseService.java # 比对响应处理 -``` - -### 1.3 核心架构改进 - -#### 1.3.1 智能发送机制 -- **自动连接管理**: 根据requestId自动判断是否需要建立连接 -- **透明化操作**: 开发者只需关心业务逻辑,连接管理完全透明 -- **配置驱动**: 通过配置文件统一管理需要建立连接的requestId - -#### 1.3.2 Spring组件化 -- **全面Spring管理**: NettyClient和SocketManager完全交给Spring容器管理 -- **依赖注入**: 通过构造函数注入实现松耦合设计 -- **生命周期管理**: 利用Spring的@PostConstruct和@PreDestroy管理组件生命周期 - -#### 1.3.3 配置统一管理 -- **集中配置**: 所有Socket相关配置统一在application.yml中管理 -- **环境隔离**: 支持不同环境使用不同的IP和端口配置 -- **配置热更新**: 支持配置的动态刷新 - ---- - -## 2. 智能Socket通信机制 - -### 2.1 智能发送机制核心设计 - -**设计理念:** -- **开发者友好**: 开发者只需调用发送方法,无需关心连接管理 -- **自动化管理**: 系统自动判断是否需要建立连接 -- **配置驱动**: 通过配置决定哪些requestId需要建立连接 - -**核心组件关系图:** - -```mermaid -graph TB - A[业务层] --> B[SocketManager] - B --> C[SocketConnectionConfig] - B --> D[NettyClient] - C --> E[application.yml] - D --> F[NettySourceClientHandler] - D --> G[NettyDevClientHandler] - - subgraph "Spring容器管理" - B - C - D - end - - subgraph "配置管理" - E - H[requestId配置] - I[IP/PORT配置] - end -``` - -### 2.2 SocketConnectionConfig.java - 智能配置管理器 - -**功能职责:** -- 管理需要建立连接的requestId配置 -- 统一管理Socket的IP和PORT配置 -- 提供配置的动态读取和验证 - -**关键代码分析:** - -```java -@Component -@ConfigurationProperties(prefix = "socket") -public class SocketConnectionConfig { - - /** - * 程控源设备配置 - */ - private SourceConfig source = new SourceConfig(); - - /** - * 被检设备配置 - */ - private DeviceConfig device = new DeviceConfig(); - - @Data - public static class SourceConfig { - private String ip = "127.0.0.1"; - private Integer port = 62000; - } - - @Data - public static class DeviceConfig { - private String ip = "127.0.0.1"; - private Integer port = 61000; - } - - /** - * 需要建立程控源通道的requestId集合 - */ - private static final Set SOURCE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList( - "yjc_ytxjy" // 源通讯检测 - )); - - /** - * 需要建立被检设备通道的requestId集合 - */ - private static final Set DEVICE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList( - "yjc_sbtxjy", // 连接建立 - "FTP_SEND$01" // ftp文件传送指令 - )); - - /** - * 检查指定的requestId是否需要建立程控源连接 - */ - public static boolean needsSourceConnection(String requestId) { - return SOURCE_CONNECTION_REQUEST_IDS.contains(requestId); - } - - /** - * 检查指定的requestId是否需要建立被检设备连接 - */ - public static boolean needsDeviceConnection(String requestId) { - return DEVICE_CONNECTION_REQUEST_IDS.contains(requestId); - } -} -``` - -### 2.3 SocketManager.java - 智能Socket管理器 - -**功能职责:** -- 提供智能发送API,自动管理连接建立 -- 统一管理Socket会话和EventLoopGroup -- 支持多种发送模式和连接状态检查 - -**关键代码分析:** - -```java -@Slf4j -@Component -public class SocketManager { - - @Autowired - private SocketConnectionConfig socketConnectionConfig; - - /** - * key为userId(xxx_Source、xxx_Dev),value为channel - */ - private static final Map socketSessions = new ConcurrentHashMap<>(); - - /** - * key为userId(xxx_Source、xxx_Dev),value为group - */ - private static final Map socketGroup = new ConcurrentHashMap<>(); - - /** - * 智能发送消息到程控源设备 - * 自动从配置文件读取IP和PORT,开发者无需关心网络配置 - * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送 - */ - public void smartSendToSource(PreDetectionParam param, String msg) { - String ip = socketConnectionConfig.getSource().getIp(); - Integer port = socketConnectionConfig.getSource().getPort(); - String requestId = extractRequestId(msg); - String userId = param.getUserPageId() + CnSocketUtil.SOURCE_TAG; - - // 检查是否需要建立连接 - if (SocketConnectionConfig.needsSourceConnection(requestId)) { - // 检查连接是否存在且活跃 - if (!isChannelActive(userId)) { - log.info("程控源连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId); - // 异步建立程控源连接并发送消息 - CompletableFuture.runAsync(() -> { - NettyClient.connectToSourceStatic(ip, port, param, msg); - }); - return; - } - } - - // 连接已存在或不需要建立连接,直接发送消息 - log.info("直接发送消息到程控源: userId={}, requestId={}", userId, requestId); - sendMsg(userId, msg); - } - - /** - * 智能发送消息到被检设备 - * 自动从配置文件读取IP和PORT,开发者无需关心网络配置 - * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送 - */ - public void smartSendToDevice(PreDetectionParam param, String msg) { - String requestId = extractRequestId(msg); - String userId = param.getUserPageId() + CnSocketUtil.DEV_TAG; - - // 检查是否需要建立连接 - if (SocketConnectionConfig.needsDeviceConnection(requestId)) { - String ip = socketConnectionConfig.getDevice().getIp(); - Integer port = socketConnectionConfig.getDevice().getPort(); - // 检查连接是否存在且活跃 - if (!isChannelActive(userId)) { - log.info("被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId); - // 异步建立被检设备连接并发送消息 - CompletableFuture.runAsync(() -> { - NettyClient.connectToDeviceStatic(ip, port, param, msg); - }); - return; - } - } - - // 连接已存在或不需要建立连接,直接发送消息 - log.info("直接发送消息到被检设备: userId={}, requestId={}", userId, requestId); - sendMsg(userId, msg); - } - - /** - * 从消息中提取requestId - * 支持JSON格式的消息解析 - */ - private static String extractRequestId(String msg) { - try { - if (StrUtil.isNotBlank(msg)) { - // 尝试解析JSON格式消息 - JSONObject jsonObject = JSON.parseObject(msg); - String requestId = jsonObject.getString("requestId"); - if (StrUtil.isNotBlank(requestId)) { - return requestId; - } - - // 如果没有requestId字段,尝试解析request_id字段 - requestId = jsonObject.getString("request_id"); - if (StrUtil.isNotBlank(requestId)) { - return requestId; - } - } - } catch (Exception e) { - log.warn("解析消息中的requestId失败: msg={}, error={}", msg, e.getMessage()); - } - - return "unknown"; - } - - /** - * 检查指定用户的Channel是否活跃 - */ - private static boolean isChannelActive(String userId) { - Channel channel = getChannelByUserId(userId); - return ObjectUtil.isNotNull(channel) && channel.isActive(); - } -} -``` - -### 2.4 使用示例 - -#### 2.4.1 业务层调用方式 - -```java -@Service -@RequiredArgsConstructor -public class PreDetectionServiceImpl implements PreDetectionService { - - private final SocketManager socketManager; - - @Override - public void sourceCommunicationCheck(PreDetectionParam param) { - // 组装检测消息 - SocketMsg msg = new SocketMsg<>(); - msg.setRequestId("yjc_ytxjy"); - msg.setOperateCode("INIT_GATHER"); - msg.setData(JSON.toJSONString(sourceParam)); - - // 智能发送 - 系统自动判断是否需要建立连接 - socketManager.smartSendToSource(param, JSON.toJSONString(msg)); - } -} -``` - -#### 2.4.2 配置文件示例 - -```yaml -# application.yml -socket: - source: - ip: 192.168.1.124 - port: 62000 - device: - ip: 192.168.1.124 - port: 61000 -``` - ---- - -## 3. Netty客户端组件详解 - -### 3.1 NettyClient.java - Spring管理的智能客户端 - -**功能职责:** -- 作为Spring组件提供连接服务 -- 支持源设备和被检设备的智能连接 -- 自动处理Handler的实例化和依赖注入 - -**关键代码分析:** - -```java -@Component -@Slf4j -public class NettyClient { - - @Autowired - private SocketSourceResponseService socketSourceResponseService; - - @Autowired - private SocketDevResponseService socketDevResponseService; - - private static NettyClient instance; - - @PostConstruct - public void init() { - instance = this; - } - - /** - * 连接到程控源设备 - * Spring管理的实例方法,支持依赖注入 - */ - public void connectToSource(String ip, Integer port, PreDetectionParam param, String msg) { - NettySourceClientHandler handler = createSourceHandler(param); - executeSocketConnection(ip, port, param, msg, handler); - } - - /** - * 连接到被检设备 - * Spring管理的实例方法,支持依赖注入 - */ - public void connectToDevice(String ip, Integer port, PreDetectionParam param, String msg) { - NettyDevClientHandler handler = createDeviceHandler(param); - executeSocketConnection(ip, port, param, msg, handler); - } - - /** - * 静态方法入口 - 保持向后兼容 - */ - public static void connectToSourceStatic(String ip, Integer port, PreDetectionParam param, String msg) { - if (instance != null) { - instance.connectToSource(ip, port, param, msg); - } else { - log.error("NettyClient未初始化,无法创建程控源连接"); - } - } - - /** - * 静态方法入口 - 保持向后兼容 - */ - public static void connectToDeviceStatic(String ip, Integer port, PreDetectionParam param, String msg) { - if (instance != null) { - instance.connectToDevice(ip, port, param, msg); - } else { - log.error("NettyClient未初始化,无法创建被检设备连接"); - } - } - - /** - * 创建源设备处理器 - * 利用Spring注入的Service实例 - */ - private NettySourceClientHandler createSourceHandler(PreDetectionParam param) { - return new NettySourceClientHandler(param, socketSourceResponseService); - } - - /** - * 创建被检设备处理器 - * 利用Spring注入的Service实例 - */ - private NettyDevClientHandler createDeviceHandler(PreDetectionParam param) { - return new NettyDevClientHandler(param, socketDevResponseService); - } - - /** - * 执行Socket连接建立流程(重构后的核心实现) - */ - private static void executeSocketConnection(String ip, Integer port, - PreDetectionParam param, String msg, SimpleChannelInboundHandler handler) { - // 创建NIO事件循环组 - NioEventLoopGroup group = createEventLoopGroup(); - - try { - // 配置客户端启动器 - Bootstrap bootstrap = configureBootstrap(group); - // 创建管道初始化器 - ChannelInitializer initializer = createChannelInitializer(param, handler); - bootstrap.handler(initializer); - // 同步连接到目标服务器 - ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); - // 处理连接结果 - handleConnectionResult(channelFuture, param, handler, group, msg); - } catch (Exception e) { - // 处理连接异常 - handleConnectionException(e, param, handler, group); - } - } - - /** - * 创建NIO事件循环组 - */ - private static NioEventLoopGroup createEventLoopGroup() { - return new NioEventLoopGroup(); - } - - /** - * 配置Bootstrap启动器 - */ - private static Bootstrap configureBootstrap(NioEventLoopGroup group) { - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(group) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - .option(ChannelOption.SO_KEEPALIVE, true) - .channel(NioSocketChannel.class); - return bootstrap; - } - - /** - * 创建通道初始化器 - * 根据处理器类型配置不同的Pipeline - */ - private static ChannelInitializer createChannelInitializer( - PreDetectionParam param, SimpleChannelInboundHandler handler) { - return new ChannelInitializer() { - @Override - protected void initChannel(NioSocketChannel ch) { - if (handler instanceof NettySourceClientHandler) { - configureSourcePipeline(ch, param, handler); - } else { - configureDevicePipeline(ch, param, handler); - } - } - }; - } - - /** - * 配置程控源设备的Pipeline - */ - private static void configureSourcePipeline(NioSocketChannel ch, PreDetectionParam param, - SimpleChannelInboundHandler handler) { - ch.pipeline() - .addLast("frame-decoder", new LineBasedFrameDecoder(10240)) - .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8)) - .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8)) - .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG)) - .addLast("source-handler", handler); - } - - /** - * 配置被检设备的Pipeline - */ - private static void configureDevicePipeline(NioSocketChannel ch, PreDetectionParam param, - SimpleChannelInboundHandler handler) { - ch.pipeline() - .addLast("frame-decoder", new LineBasedFrameDecoder(10240)) - .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8)) - .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8)) - .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.DEV_TAG)) - .addLast("idle-detector", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)) - .addLast("device-handler", handler); - } - - /** - * 处理连接建立结果 - */ - private static void handleConnectionResult(ChannelFuture channelFuture, PreDetectionParam param, - SimpleChannelInboundHandler handler, NioEventLoopGroup group, String msg) { - if (channelFuture.isSuccess()) { - log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress()); - - // 注册会话和EventLoopGroup到管理器 - String userId = getConnectionUserId(param, handler); - SocketManager.addUser(userId, channelFuture.channel()); - SocketManager.addGroup(userId, group); - - // 发送初始消息 - if (StrUtil.isNotBlank(msg)) { - channelFuture.channel().writeAndFlush(msg + "\n"); - log.info("发送初始消息: {}", msg); - } - } else { - log.error("Socket连接建立失败"); - handleConnectionFailure(param, group); - } - } - - /** - * 获取连接的用户ID - */ - private static String getConnectionUserId(PreDetectionParam param, SimpleChannelInboundHandler handler) { - String tag = (handler instanceof NettySourceClientHandler) ? CnSocketUtil.SOURCE_TAG : CnSocketUtil.DEV_TAG; - return param.getUserPageId() + tag; - } - - /** - * 处理连接异常 - */ - private static void handleConnectionException(Exception e, PreDetectionParam param, - SimpleChannelInboundHandler handler, NioEventLoopGroup group) { - log.error("Socket连接过程中发生异常: {}", e.getMessage(), e); - - // 清理资源 - if (group != null) { - group.shutdownGracefully(); - } - - // 发送错误消息到前端 - try { - CnSocketUtil.quitSendSource(param); - CnSocketUtil.quitSend(param); - } catch (Exception ex) { - log.error("发送错误消息失败", ex); - } - } - - /** - * 处理连接失败情况 - */ - private static void handleConnectionFailure(PreDetectionParam param, NioEventLoopGroup group) { - // 清理EventLoopGroup资源 - if (group != null) { - group.shutdownGracefully(); - } - - // 通知业务层连接失败 - try { - CnSocketUtil.quitSendSource(param); - CnSocketUtil.quitSend(param); - } catch (Exception e) { - log.error("处理连接失败通知时发生异常", e); - } - } -} -``` - -### 3.2 Handler组件改进 - -#### 3.2.1 NettySourceClientHandler.java - 源设备处理器 - -**功能改进:** -- 简化异常处理逻辑,移除冗余注释 -- 使用slf4j日志替代System.out.println -- 优化连接状态管理 - -```java -@RequiredArgsConstructor -@Slf4j -public class NettySourceClientHandler extends SimpleChannelInboundHandler { - private final PreDetectionParam webUser; - private final SocketSourceResponseService sourceResponseService; - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.info("程控源客户端通道已建立: {}", ctx.channel().id()); - SocketManager.addUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG, ctx.channel()); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { - log.debug("接收源设备数据: {}", msg); - try { - sourceResponseService.deal(webUser, msg); - } catch (Exception e) { - log.error("处理源设备响应异常", e); - CnSocketUtil.quitSend(webUser); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - log.info("程控源客户端连接断开"); - ctx.close(); - SocketManager.removeUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) { - log.debug("程控源设备空闲状态触发"); - } - } else { - super.userEventTriggered(ctx, evt); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - log.error("程控源通信异常", cause); - if (cause instanceof ConnectException) { - log.warn("程控源连接异常"); - } else if (cause instanceof IOException) { - WebServiceManager.sendDetectionErrorMessage(webUser.getUserPageId(), SourceOperateCodeEnum.SERVER_ERROR); - } else if (cause instanceof TimeoutException) { - log.warn("程控源通信超时"); - } - ctx.close(); - } -} -``` - -#### 3.2.2 NettyDevClientHandler.java - 被检设备处理器 - -**功能改进:** -- 精简超时检测逻辑,移除大段注释代码 -- 优化异常处理机制 -- 统一日志输出格式 - -```java -@RequiredArgsConstructor -@Slf4j -public class NettyDevClientHandler extends SimpleChannelInboundHandler { - private final PreDetectionParam param; - private final SocketDevResponseService socketDevResponseService; - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.info("被检设备客户端通道已建立: {}", ctx.channel().id()); - SocketManager.addUser(param.getUserPageId() + CnSocketUtil.DEV_TAG, ctx.channel()); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { - log.debug("接收被检设备数据: {}", msg); - try { - socketDevResponseService.deal(param, msg); - } catch (Exception e) { - log.error("处理被检设备响应异常", e); - CnSocketUtil.quitSend(param); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - log.info("被检设备客户端连接断开"); - ctx.close(); - SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent event = (IdleStateEvent) evt; - if (event.state() == IdleState.READER_IDLE) { - log.warn("被检设备读超时触发"); - handleReadTimeout(ctx); - } - } else { - super.userEventTriggered(ctx, evt); - } - } - - /** - * 处理读超时事件 - */ - private void handleReadTimeout(ChannelHandlerContext ctx) { - if (!FormalTestManager.hasStopFlag) { - if (CollUtil.isNotEmpty(SocketManager.getSourceList())) { - SourceIssue sourceIssue = SocketManager.getSourceList().get(0); - // 更新超时计时器 - updateTimeoutCounter(sourceIssue); - // 检查是否需要触发超时处理 - if (shouldTriggerTimeout(sourceIssue)) { - log.warn("检测项超时: {}", sourceIssue.getType()); - CnSocketUtil.quitSend(param); - timeoutSend(sourceIssue); - } - } - } - } - - /** - * 更新超时计时器 - */ - private void updateTimeoutCounter(SourceIssue sourceIssue) { - SocketManager.clockMap.put(sourceIssue.getIndex(), - SocketManager.clockMap.getOrDefault(sourceIssue.getIndex(), 0L) + 60L); - } - - /** - * 判断是否应该触发超时处理 - */ - private boolean shouldTriggerTimeout(SourceIssue sourceIssue) { - Long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex()); - if (currentTime == null) return false; - - // 根据检测类型设置不同的超时时间 - if (sourceIssue.getType().equals(DicDataEnum.F.getCode())) { - return currentTime > 1300; // 闪变: 20分钟超时 - } else if (sourceIssue.getType().equals(DicDataEnum.VOLTAGE.getCode()) || - sourceIssue.getType().equals(DicDataEnum.HP.getCode())) { - return currentTime > 180; // 统计数据: 3分钟超时 - } else { - return currentTime > 60; // 实时数据: 1分钟超时 - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - log.error("被检设备通信异常", cause); - if (cause instanceof ConnectException) { - log.warn("被检设备连接异常"); - } else if (cause instanceof IOException) { - WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR); - } else if (cause instanceof TimeoutException) { - log.warn("被检设备通信超时"); - } - - // 清理资源并断开连接 - CnSocketUtil.quitSend(param); - CnSocketUtil.quitSendSource(param); - ctx.close(); - } -} -``` - ---- - -## 4. Netty服务端组件详解 - -### 4.1 NettyServer.java - 服务器核心 - -**功能职责:** -- 提供Socket服务端功能,用于测试和开发 -- 支持源通信服务和设备通信服务 -- 模拟外部设备的响应行为 - -**关键代码分析:** - -```java -public class NettyServer { - public static final int port = 8574; - - private void runSource() { - NioEventLoopGroup boss = new NioEventLoopGroup(1); - NioEventLoopGroup work = new NioEventLoopGroup(); - try { - ServerBootstrap bootstrap = new ServerBootstrap().group(boss, work); - bootstrap.channel(NioServerSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(ServerSocketChannel ch) { - System.out.println("源通讯服务正在启动中......"); - } - }) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(NioSocketChannel ch) { - ch.pipeline() - .addLast(new LineBasedFrameDecoder(10240)) - .addLast(new StringDecoder(CharsetUtil.UTF_8)) - .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new DevNettyServerHandler()); - } - }); - - ChannelFuture future = bootstrap.bind(port).sync(); - future.addListener(f -> { - if (future.isSuccess()) { - System.out.println("源通讯服务启动成功"); - } else { - System.out.println("源通讯服务启动失败"); - } - }); - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - boss.shutdownGracefully(); - work.shutdownGracefully(); - } - } -} -``` - ---- - -## 5. WebSocket通信组件详解 - -### 5.1 WebSocketService.java - WebSocket服务核心 - -**功能职责:** -- 启动基于Netty的WebSocket服务器 -- 管理服务器生命周期(启动/关闭) -- 提供高性能的WebSocket通信支持 - -**关键代码分析:** - -```java -@Component -@RequiredArgsConstructor -@Slf4j -public class WebSocketService implements ApplicationRunner { - - @Value("${webSocket.port:7777}") - int port; - - EventLoopGroup bossGroup; - EventLoopGroup workerGroup; - private Channel serverChannel; - private CompletableFuture serverFuture; - - @Override - public void run(ApplicationArguments args) { - // 使用CompletableFuture异步启动WebSocket服务,避免阻塞Spring Boot主线程 - serverFuture = CompletableFuture.runAsync(this::startWebSocketServer) - .exceptionally(throwable -> { - log.error("WebSocket服务启动异常", throwable); - return null; - }); - } - - private void startWebSocketServer() { - try { - bossGroup = new NioEventLoopGroup(1); - workerGroup = new NioEventLoopGroup(); - - ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler()) - .option(ChannelOption.SO_BACKLOG, 128) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childHandler(new WebSocketInitializer()); - - ChannelFuture future = serverBootstrap.bind(port).sync(); - serverChannel = future.channel(); - - future.addListener(f -> { - if (future.isSuccess()) { - log.info("webSocket服务启动成功,端口:{}", port); - } else { - log.error("webSocket服务启动失败,端口:{}", port); - } - }); - - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { - log.error("WebSocket服务启动过程中被中断", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.error("WebSocket服务启动失败", e); - throw new RuntimeException("WebSocket服务启动失败", e); - } finally { - shutdownGracefully(); - } - } - - @PreDestroy - public void destroy() throws InterruptedException { - log.info("正在关闭WebSocket服务..."); - - if (serverChannel != null) { - try { - serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS); - } catch (Exception e) { - log.warn("关闭服务器通道时发生异常", e); - } - } - - if (bossGroup != null) { - bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync(); - } - if (workerGroup != null) { - workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync(); - } - - if (serverFuture != null && !serverFuture.isDone()) { - boolean cancelled = serverFuture.cancel(true); - } - - log.info("webSocket服务已销毁"); - } -} -``` - ---- - -## 6. Socket响应处理器详解 - -### 6.1 响应处理器改进 - -**主要改进:** -- 支持SocketManager的依赖注入 -- 移除硬编码的IP/PORT配置 -- 使用智能发送机制简化代码 - -#### 6.1.1 SocketSourceResponseService.java - -```java -@Service -@RequiredArgsConstructor -public class SocketSourceResponseService { - - private final SocketDevResponseService socketDevResponseService; - private final IPqDevService iPqDevService; - private final SocketManager socketManager; - - public void deal(PreDetectionParam param, String msg) throws Exception { - SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); - String[] tem = socketDataMsg.getRequestId().split(CnSocketUtil.STEP_TAG); - SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(tem[0]); - - if (ObjectUtil.isNotNull(enumByCode)) { - switch (enumByCode) { - case YJC_YTXJY: - if (ObjectUtil.isNotNull(param.getPlanId())) { - detectionDev(param, socketDataMsg); - } else { - handleYtxjySimulate(param, socketDataMsg); - } - break; - case YJC_XUJY: - phaseSequenceDev(param, socketDataMsg); - break; - case FORMAL_REAL: - if (ObjectUtil.isNotNull(param.getPlanId())) { - senParamToDev(param, socketDataMsg); - } else { - handleSimulateTest(param, socketDataMsg); - } - break; - case Coefficient_Check: - coefficient(param, socketDataMsg); - break; - } - } - } - - // 装置检测 - 使用智能发送机制 - private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) { - SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); - if (ObjectUtil.isNotNull(dictDataEnumByCode)) { - SocketMsg socketMsg = new SocketMsg<>(); - switch (dictDataEnumByCode) { - case SUCCESS: - WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - - Map> map = new HashMap<>(1); - map.put("deviceList", FormalTestManager.devList); - String jsonString = JSON.toJSONString(map); - socketMsg.setRequestId(SourceOperateCodeEnum.YJC_SBTXJY.getValue()); - socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue()); - socketMsg.setData(jsonString); - String json = JSON.toJSONString(socketMsg); - - // 使用智能发送工具类,自动管理设备连接 - socketManager.smartSendToDevice(param, json); - break; - case UNPROCESSED_BUSINESS: - WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - break; - default: - CnSocketUtil.quitSendSource(param); - WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); - break; - } - } - } -} -``` - ---- - -## 7. Socket管理与工具类详解 - -### 7.1 SocketManager.java - 智能Socket管理器 - -**核心功能:** -1. **智能发送机制**: 自动判断连接需求,透明管理连接建立 -2. **Spring组件管理**: 完全交给Spring容器管理,支持依赖注入 -3. **会话管理**: 统一管理Socket连接会话和EventLoopGroup -4. **检测任务管理**: 管理检测相关的状态信息和配置 - -**关键数据结构:** - -```java -@Component -@Slf4j -public class SocketManager { - - @Autowired - private SocketConnectionConfig socketConnectionConfig; - - // Socket会话管理 - private static final Map socketSessions = new ConcurrentHashMap<>(); - private static final Map socketGroup = new ConcurrentHashMap<>(); - - // 检测任务管理 - private static Map targetMap = new ConcurrentHashMap<>(); - private static List sourceIssueList = new CopyOnWriteArrayList<>(); - public static Map valueTypeMap = new HashMap<>(); - public static volatile Map clockMap = new ConcurrentHashMap<>(); - public static volatile Map contrastClockMap = new ConcurrentHashMap<>(); - - // 基础连接管理方法 - public static void addUser(String userId, Channel channel) { - socketSessions.put(userId, channel); - } - - public static void addGroup(String userId, NioEventLoopGroup group) { - socketGroup.put(userId, group); - } - - public static void removeUser(String userId) { - Channel channel = socketSessions.get(userId); - if (ObjectUtil.isNotNull(channel)) { - try { - channel.close().sync(); - } catch (InterruptedException e) { - log.error("关闭通道异常", e); - } - NioEventLoopGroup eventExecutors = socketGroup.get(userId); - if (ObjectUtil.isNotNull(eventExecutors)) { - eventExecutors.shutdownGracefully(); - log.info("{}__{}关闭了客户端", userId, channel.id()); - } - } - socketSessions.remove(userId); - } - - public static void sendMsg(String userId, String msg) { - Channel channel = socketSessions.get(userId); - if (ObjectUtil.isNotNull(channel)) { - channel.writeAndFlush(msg + '\n'); - log.info("{}__{}往{}发送数据:{}", userId, channel.id(), channel.remoteAddress(), msg); - } else { - log.warn("{}__发送数据:失败通道不存在{}", userId, msg); - } - } - - // 检测任务管理方法 - public static void addSourceList(List sList) { - sourceIssueList = sList; - } - - public static List getSourceList() { - return sourceIssueList; - } - - public static void delSource(Integer index) { - sourceIssueList.removeIf(s -> index.equals(s.getIndex())); - } - - public static void delSourceTarget(String sourceTag) { - targetMap.remove(sourceTag); - } - - public static void initMap(Map map) { - targetMap = map; - } - - public static void addTargetMap(String scriptType, Long count) { - targetMap.put(scriptType, count); - } - - public static Long getSourceTarget(String scriptType) { - return targetMap.get(scriptType); - } -} -``` - -### 7.2 CnSocketUtil.java - Socket工具类 - -**功能职责:** -- 提供Socket连接的控制功能 -- 封装WebSocket消息推送 -- 定义通信相关常量 - -**关键代码:** - -```java -public class CnSocketUtil { - - public final static String DEV_TAG = "_Dev"; - public final static String SOURCE_TAG = "_Source"; - public final static String START_TAG = "_Start"; - public final static String END_TAG = "_End"; - public final static String STEP_TAG = "&&"; - public final static String SPLIT_TAG = "_"; - - // 退出检测 - public static void quitSend(PreDetectionParam param) { - SocketMsg socketMsg = new SocketMsg<>(); - socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); - socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); - SocketManager.sendMsg(param.getUserPageId() + DEV_TAG, JSON.toJSONString(socketMsg)); - WebServiceManager.removePreDetectionParam(); - } - - // 关闭源连接 - public static void quitSendSource(PreDetectionParam param) { - SocketMsg socketMsg = new SocketMsg<>(); - socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue()); - socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue()); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("sourceId", param.getSourceId()); - socketMsg.setData(jsonObject.toJSONString()); - SocketManager.sendMsg(param.getUserPageId() + SOURCE_TAG, JSON.toJSONString(socketMsg)); - WebServiceManager.removePreDetectionParam(); - } - - // 推送webSocket数据 - public static void sendToWebSocket(String userId, String requestId, String operatorType, - Object data, String desc) { - WebSocketVO webSocketVO = new WebSocketVO<>(); - webSocketVO.setRequestId(requestId); - webSocketVO.setOperateCode(operatorType); - webSocketVO.setData(data); - webSocketVO.setDesc(desc); - WebServiceManager.sendMessage(userId, webSocketVO); - } - - // 比对式-退出检测 - public static void contrastSendquit(String userId) { - System.out.println("比对式-发送关闭备通讯模块指令"); - SocketMsg socketMsg = new SocketMsg<>(); - socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); - socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); - SocketManager.sendMsg(userId + DEV_TAG, JSON.toJSONString(socketMsg)); - WebServiceManager.removePreDetectionParam(); - } -} -``` - ---- - -## 8. 通信数据对象详解 - -### 8.1 数据对象结构 - -#### 8.1.1 SocketMsg.java - Socket消息对象 - -```java -public class SocketMsg { - private String requestId; // 请求ID,用于标识消息类型和流程 - private String operateCode; // 操作代码,标识具体的操作类型 - private T data; // 数据载荷,支持泛型 - private String desc; // 描述信息 - private Long timestamp; // 时间戳 -} -``` - -#### 8.1.2 SocketDataMsg.java - Socket数据消息对象 - -```java -public class SocketDataMsg { - private String requestId; // 请求ID - private String operateCode; // 操作代码 - private String data; // 响应数据(JSON字符串) - private Integer code; // 响应状态码 - private String message; // 响应消息 - private String type; // 消息类型 -} -``` - -#### 8.1.3 WebSocketVO.java - WebSocket数据对象 - -```java -public class WebSocketVO { - private String requestId; // 请求ID - private String operateCode; // 操作代码 - private T data; // 数据载荷 - private String desc; // 描述信息 - private Integer status; // 状态码 - private Long timestamp; // 时间戳 - private String userId; // 用户ID -} -``` - ---- - -## 9. 通信流程分析 - -### 9.1 智能发送流程 - -```mermaid -sequenceDiagram - participant Business as 业务层 - participant SocketManager as SocketManager - participant Config as SocketConnectionConfig - participant NettyClient as NettyClient - participant Device as 外部设备 - - Business->>SocketManager: smartSendToSource(param, msg) - SocketManager->>SocketManager: extractRequestId(msg) - SocketManager->>Config: needsSourceConnection(requestId) - - alt 需要建立连接 - Config-->>SocketManager: true - SocketManager->>SocketManager: isChannelActive(userId) - - alt 连接不存在 - SocketManager->>Config: getSource().getIp/Port() - Config-->>SocketManager: IP/PORT配置 - SocketManager->>NettyClient: connectToSourceStatic(ip, port, param, msg) - NettyClient->>Device: 建立连接并发送消息 - else 连接已存在 - SocketManager->>SocketManager: sendMsg(userId, msg) - end - else 不需要建立连接 - Config-->>SocketManager: false - SocketManager->>SocketManager: sendMsg(userId, msg) - end -``` - -### 9.2 Spring组件生命周期流程 - -```mermaid -graph TB - A[Spring容器启动] --> B[SocketConnectionConfig初始化] - B --> C[@ConfigurationProperties绑定配置] - C --> D[NettyClient注入依赖] - D --> E[SocketManager注入配置] - E --> F[业务层注入SocketManager] - F --> G[智能发送服务就绪] - - G --> H[接收发送请求] - H --> I[检查连接需求] - I --> J[自动建立连接] - J --> K[发送消息] - - K --> L[Spring容器关闭] - L --> M[@PreDestroy清理资源] - M --> N[关闭所有连接] -``` - -### 9.3 配置管理流程 - -```mermaid -flowchart TD - A[application.yml] --> B[Spring Boot配置绑定] - B --> C[SocketConnectionConfig] - - C --> D[Source配置] - C --> E[Device配置] - C --> F[RequestId配置] - - D --> G[程控源IP/PORT] - E --> H[被检设备IP/PORT] - F --> I[连接需求判断] - - G --> J[SocketManager智能发送] - H --> J - I --> J - - J --> K[自动连接管理] - K --> L[透明化发送] -``` - ---- - -## 10. 关键技术特性 - -### 10.1 智能发送机制特性 - -#### 10.1.1 自动连接管理 -- **智能判断**: 根据requestId自动判断是否需要建立连接 -- **透明操作**: 开发者无需关心连接建立过程 -- **配置驱动**: 通过简单配置控制连接行为 - -#### 10.1.2 连接状态检测 -```java -private static boolean isChannelActive(String userId) { - Channel channel = getChannelByUserId(userId); - return ObjectUtil.isNotNull(channel) && channel.isActive(); -} -``` - -#### 10.1.3 异步连接建立 -```java -CompletableFuture.runAsync(() -> { - NettyClient.connectToSourceStatic(ip, port, param, msg); -}); -``` - -### 10.2 Spring组件化特性 - -#### 10.2.1 依赖注入管理 -```java -@Component -public class SocketManager { - @Autowired - private SocketConnectionConfig socketConnectionConfig; -} - -@Service -@RequiredArgsConstructor -public class PreDetectionServiceImpl { - private final SocketManager socketManager; -} -``` - -#### 10.2.2 配置属性绑定 -```java -@Component -@ConfigurationProperties(prefix = "socket") -public class SocketConnectionConfig { - private SourceConfig source = new SourceConfig(); - private DeviceConfig device = new DeviceConfig(); -} -``` - -### 10.3 配置统一管理特性 - -#### 10.3.1 统一配置文件 -```yaml -socket: - source: - ip: 192.168.1.124 - port: 62000 - device: - ip: 192.168.1.124 - port: 61000 -``` - -#### 10.3.2 动态配置支持 -- **环境隔离**: 支持不同环境使用不同配置 -- **热更新**: 支持配置的动态刷新 -- **默认值**: 提供合理的默认配置值 - -### 10.4 异常处理和资源管理特性 - -#### 10.4.1 优化的异常处理 -```java -@Override -public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - log.error("通信异常", cause); - if (cause instanceof ConnectException) { - log.warn("连接异常"); - } else if (cause instanceof IOException) { - WebServiceManager.sendDetectionErrorMessage(userId, errorCode); - } - ctx.close(); -} -``` - -#### 10.4.2 完善的资源清理 -```java -public static void removeUser(String userId) { - Channel channel = socketSessions.get(userId); - if (ObjectUtil.isNotNull(channel)) { - try { - channel.close().sync(); - } catch (InterruptedException e) { - log.error("关闭通道异常", e); - } - NioEventLoopGroup eventExecutors = socketGroup.get(userId); - if (ObjectUtil.isNotNull(eventExecutors)) { - eventExecutors.shutdownGracefully(); - } - } - socketSessions.remove(userId); -} -``` - -### 10.5 并发安全特性 - -#### 10.5.1 线程安全设计 -- **ConcurrentHashMap**: 用于会话管理 -- **CopyOnWriteArrayList**: 用于检测项列表 -- **volatile关键字**: 用于状态标志 -- **CompletableFuture**: 用于异步处理 - -#### 10.5.2 日志统一管理 -```java -@Slf4j -public class NettyClient { - log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress()); - log.error("Socket连接过程中发生异常: {}", e.getMessage(), e); - log.debug("发送初始消息: {}", msg); -} -``` - ---- - -## 11. 配置与部署 - -### 11.1 应用配置 - -#### 11.1.1 核心配置文件 -```yaml -# application.yml -webSocket: - port: 7777 # WebSocket服务端口 - -socket: - source: - ip: 192.168.1.124 # 程控源设备IP - port: 62000 # 程控源设备端口 - device: - ip: 192.168.1.124 # 被检设备IP - port: 61000 # 被检设备端口 - -netty: - server: - port: 8574 # Netty服务端端口(测试用) - -# 日志配置 -logging: - level: - com.njcn.gather.detection.util.socket: INFO - com.njcn.gather.detection.handler: INFO - io.netty: WARN - pattern: - console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" -``` - -#### 11.1.2 环境配置示例 -```yaml -# application-dev.yml (开发环境) -socket: - source: - ip: 127.0.0.1 - port: 62000 - device: - ip: 127.0.0.1 - port: 61000 - -# application-prod.yml (生产环境) -socket: - source: - ip: 192.168.1.124 - port: 62000 - device: - ip: 192.168.1.124 - port: 61000 -``` - -### 11.2 Maven依赖配置 - -```xml - - - - org.springframework.boot - spring-boot-starter - 2.3.12.RELEASE - - - - - io.netty - netty-all - 4.1.76.Final - - - - - com.alibaba - fastjson - 1.2.83 - - - - - cn.hutool - hutool-all - 5.8.10 - - - - - org.projectlombok - lombok - true - - -``` - -### 11.3 Spring Boot集成 - -#### 11.3.1 自动配置启用 -```java -@SpringBootApplication -@EnableConfigurationProperties(SocketConnectionConfig.class) -public class DetectionApplication { - public static void main(String[] args) { - SpringApplication.run(DetectionApplication.class, args); - } -} -``` - -#### 11.3.2 组件扫描配置 -```java -@ComponentScan(basePackages = { - "com.njcn.gather.detection.util.socket", - "com.njcn.gather.detection.handler", - "com.njcn.gather.detection.service" -}) -``` - -### 11.4 性能调优参数 - -#### 11.4.1 Netty性能参数 -```java -// 服务端性能调优 -ServerBootstrap serverBootstrap = new ServerBootstrap(); -serverBootstrap.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小 - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间 - .childOption(ChannelOption.SO_KEEPALIVE, true) // 启用TCP keepalive - .childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法 - .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区大小 - .childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 发送缓冲区大小 - -// 客户端性能调优 -Bootstrap bootstrap = new Bootstrap(); -bootstrap.group(group) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时 - .option(ChannelOption.SO_KEEPALIVE, true) // keepalive - .option(ChannelOption.TCP_NODELAY, true) // 立即发送 - .channel(NioSocketChannel.class); -``` - -#### 11.4.2 线程池配置 -```yaml -# application.yml -spring: - task: - execution: - pool: - core-size: 8 - max-size: 16 - queue-capacity: 100 - thread-name-prefix: "detection-" -``` - -### 11.5 监控和诊断 - -#### 11.5.1 健康检查配置 -```java -@Component -public class SocketHealthIndicator implements HealthIndicator { - - @Autowired - private SocketManager socketManager; - - @Override - public Health health() { - // 检查Socket连接状态 - if (hasActiveConnections()) { - return Health.up() - .withDetail("activeConnections", getActiveConnectionCount()) - .build(); - } else { - return Health.down() - .withDetail("reason", "No active socket connections") - .build(); - } - } -} -``` - -#### 11.5.2 指标监控 -```java -@Component -public class SocketMetrics { - - private final MeterRegistry meterRegistry; - private final Counter connectionCounter; - private final Timer messageProcessingTimer; - - public SocketMetrics(MeterRegistry meterRegistry) { - this.meterRegistry = meterRegistry; - this.connectionCounter = Counter.builder("socket.connections.total") - .description("Total socket connections") - .register(meterRegistry); - this.messageProcessingTimer = Timer.builder("socket.message.processing.time") - .description("Message processing time") - .register(meterRegistry); - } -} -``` - ---- - -## 总结 - -CN_Gather Detection模块的全新Netty通信架构通过**智能Socket管理机制**和**全面Spring组件化**的设计,实现了电能质量设备检测系统的现代化通信解决方案。 - -### 核心架构优势 - -1. **智能化程度高** - - 自动连接管理,开发者无需关心连接细节 - - 配置驱动的连接策略,灵活可控 - - 透明化的发送机制,简化业务代码 - -2. **Spring生态集成** - - 完全Spring组件化管理,遵循IoC原则 - - 统一的配置管理,支持多环境部署 - - 完善的依赖注入,松耦合设计 - -3. **代码质量提升** - - 移除大量冗余和无用代码 - - 统一日志管理,便于调试和监控 - - 优化异常处理,提高系统稳定性 - -4. **可维护性增强** - - 模块化设计,职责边界清晰 - - 配置集中管理,降低维护成本 - - 完善的资源管理,避免内存泄漏 - -5. **开发体验优化** - - 简化的API设计,降低使用门槛 - - 智能化的连接管理,减少样板代码 - - 统一的错误处理,提高开发效率 - -### 技术特色 - -- **智能发送机制**: 业界领先的自动连接管理技术 -- **配置统一管理**: 现代化的配置管理模式 -- **Spring深度集成**: 充分利用Spring生态优势 -- **高并发支持**: 基于Netty NIO的高性能通信 -- **完善监控**: 全方位的监控和诊断能力 - -该架构为CN_Gather系统提供了稳定、高效、易维护的通信基础,确保了电能质量检测业务的可靠运行,同时为未来的功能扩展和性能优化奠定了坚实基础。 \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java index f3b2700a..49f0433f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java +++ b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java @@ -1,17 +1,27 @@ package com.njcn.gather.detection.controller; +import cn.hutool.core.util.StrUtil; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.constant.OperateType; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.LogUtil; +import com.njcn.gather.detection.lock.DetectionLock; +import com.njcn.gather.detection.lock.DetectionLockManager; +import com.njcn.gather.detection.lock.DetectionLockManager.AcquireResult; +import com.njcn.gather.detection.pojo.enums.DetectionResponseEnum; import com.njcn.gather.detection.pojo.param.ContrastDetectionParam; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.param.SimulateDetectionParam; +import com.njcn.gather.detection.pojo.vo.DetectionLockHolderVO; import com.njcn.gather.detection.service.PreDetectionService; +import com.njcn.gather.detection.util.socket.FormalTestManager; +import com.njcn.gather.user.user.pojo.po.SysUser; +import com.njcn.gather.user.user.service.ISysUserService; import com.njcn.web.controller.BaseController; import com.njcn.web.utils.HttpResultUtil; +import com.njcn.web.utils.RequestUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; @@ -32,6 +42,7 @@ import org.springframework.web.bind.annotation.*; public class PreDetectionController extends BaseController { private final PreDetectionService preDetectionService; + private final ISysUserService sysUserService; /** * 开始检测通用入口 @@ -42,10 +53,27 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("开始检测") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult startPreTest(@RequestBody @Validated PreDetectionParam param) { + public HttpResult startPreTest(@RequestBody @Validated PreDetectionParam param) { String methodDescribe = getMethodDescribe("startPreTest"); - preDetectionService.sourceCommunicationCheck(param); - return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + HttpResult busy = tryAcquireLock(param.getUserPageId()); + if (busy != null) { + return busy; + } + // 同步阶段抛异常时回滚锁(PLAN_AND_SOURCE_NOT / SOURCE_INFO_NOT 等业务异常会被全局处理器吞掉, + // 锁会卡在用户手上直到 4 小时超时,故需 finally 兜底) + boolean keepLock = false; + try { + // 重置 FormalTestManager 暂停计数残留,避免上次暂停残留计数误触发 R4 + FormalTestManager.stopTime = 0; + FormalTestManager.hasStopFlag = false; + preDetectionService.sourceCommunicationCheck(param); + keepLock = true; + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } finally { + if (!keepLock) { + releaseLockSelf("START_PRE_SYNC_FAILED"); + } + } } @@ -59,8 +87,12 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("源通讯校验") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult ytxCheckSimulate(@RequestBody @Validated SimulateDetectionParam param) { + public HttpResult ytxCheckSimulate(@RequestBody @Validated SimulateDetectionParam param) { String methodDescribe = getMethodDescribe("ytxCheckSimulate"); + HttpResult busy = requireFreeOrSelf(); + if (busy != null) { + return busy; + } preDetectionService.ytxCheckSimulate(param); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -72,8 +104,12 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("启动") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult startTestSimulate(@RequestBody @Validated SimulateDetectionParam param) { + public HttpResult startTestSimulate(@RequestBody @Validated SimulateDetectionParam param) { String methodDescribe = getMethodDescribe("startTestSimulate"); + HttpResult busy = requireHolderSelf(); + if (busy != null) { + return busy; + } preDetectionService.sendScript(param); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -85,9 +121,18 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("停止") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult closeSimulateTest(@RequestBody @Validated SimulateDetectionParam param) { + public HttpResult closeSimulateTest(@RequestBody @Validated SimulateDetectionParam param) { String methodDescribe = getMethodDescribe("closeSimulateTest"); - preDetectionService.closeTestSimulate(param); + HttpResult busy = requireHolderSelf(); + if (busy != null) { + return busy; + } + try { + preDetectionService.closeTestSimulate(param); + } finally { + // 即使业务异常也要释放锁,避免锁残留导致他人无法接手 + releaseLockSelf("USER_STOP"); + } return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -99,8 +144,12 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("系数校验") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult coefficientCheck(@RequestBody PreDetectionParam param) { + public HttpResult coefficientCheck(@RequestBody PreDetectionParam param) { String methodDescribe = getMethodDescribe("coefficientCheck"); + HttpResult busy = requireHolderSelf(); + if (busy != null) { + return busy; + } preDetectionService.coefficientCheck(param); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -113,8 +162,13 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("暂停检测") @ApiImplicitParam(name = "param", value = "参数", required = true) - public HttpResult temStopTest() { + public HttpResult temStopTest() { String methodDescribe = getMethodDescribe("temStopTest"); + HttpResult busy = requireHolderSelf(); + if (busy != null) { + return busy; + } + // 暂停保持锁(spec §2.3),不释放 preDetectionService.temStopTest(); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -126,8 +180,12 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("重新开始检测") @ApiImplicitParam(name = "param", value = "参数", required = true) - public HttpResult restartTemTest(@RequestBody PreDetectionParam param) { + public HttpResult restartTemTest(@RequestBody PreDetectionParam param) { String methodDescribe = getMethodDescribe("restartTemTest"); + HttpResult busy = requireHolderSelf(); + if (busy != null) { + return busy; + } preDetectionService.restartTemTest(param); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -140,10 +198,26 @@ public class PreDetectionController extends BaseController { @OperateInfo @ApiOperation("开始比对检测") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult startContrastTest(@RequestBody @Validated ContrastDetectionParam param) { + public HttpResult startContrastTest(@RequestBody @Validated ContrastDetectionParam param) { String methodDescribe = getMethodDescribe("startContrastTest"); - preDetectionService.startContrastTest(param); - return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + // ContrastDetectionParam 无 userPageId 字段,用 loginName 作为会话标识(与 WS 会话 key 一致) + HttpResult busy = tryAcquireLock(param.getLoginName()); + if (busy != null) { + return busy; + } + // 同步阶段抛异常时回滚锁,理由同 startPreTest + boolean keepLock = false; + try { + FormalTestManager.stopTime = 0; + FormalTestManager.hasStopFlag = false; + preDetectionService.startContrastTest(param); + keepLock = true; + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } finally { + if (!keepLock) { + releaseLockSelf("START_CONTRAST_SYNC_FAILED"); + } + } } @@ -169,11 +243,91 @@ public class PreDetectionController extends BaseController { @OperateInfo(info = LogEnum.SYSTEM_COMMON) @GetMapping("/startCoefficient") @ApiOperation("比对模式开启系数校验") - public HttpResult startCoefficient() { + public HttpResult startCoefficient() { String methodDescribe = getMethodDescribe("startCoefficient"); LogUtil.njcnDebug(log, "{}", methodDescribe); - + HttpResult busy = requireHolderSelf(); + if (busy != null) { + return busy; + } preDetectionService.startCoefficient(); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + + // ============ 检测互斥锁辅助方法 ============ + + /** 抢锁入口(startPreTest / startContrastTest 用)。 + * 抢到→null;被他人持有或竞态失败→返回 busy 响应; + * 使用方:拿到非 null 返回值直接 return 给上层。 */ + private HttpResult tryAcquireLock(String userPageId) { + String userId = RequestUtil.getUserId(); + AcquireResult r = DetectionLockManager.getInstance() + .tryAcquire(userId, resolveDisplayName(userId), userPageId); + if (r.isOk()) { + return null; + } + return HttpResultUtil.assembleResult( + DetectionResponseEnum.DETECTION_BUSY.getCode(), + r.getHolder(), + DetectionResponseEnum.DETECTION_BUSY.getMessage()); + } + + /** 中间接口校验:要求当前 holder == 自己。 + * 空闲 → 返回 busy data=null("请先开始检测"语义); + * 他人持有 → 返回 busy + holder; + * 自己持有 → 返回 null(放行)。 */ + private HttpResult requireHolderSelf() { + DetectionLock cur = DetectionLockManager.getInstance().getCurrent(); + String me = RequestUtil.getUserId(); + if (cur != null && me.equals(cur.getUserId())) { + return null; + } + DetectionLockHolderVO holder = cur == null ? null : DetectionLockManager.toHolderVO(cur); + return HttpResultUtil.assembleResult( + DetectionResponseEnum.DETECTION_BUSY.getCode(), + holder, + DetectionResponseEnum.DETECTION_BUSY.getMessage()); + } + + /** 辅助接口规则(ytxCheckSimulate):锁空闲 → 放行;他人持有 → busy;自己持有 → 放行。 */ + private HttpResult requireFreeOrSelf() { + DetectionLock cur = DetectionLockManager.getInstance().getCurrent(); + if (cur == null) { + return null; + } + String me = RequestUtil.getUserId(); + if (me.equals(cur.getUserId())) { + return null; + } + return HttpResultUtil.assembleResult( + DetectionResponseEnum.DETECTION_BUSY.getCode(), + DetectionLockManager.toHolderVO(cur), + DetectionResponseEnum.DETECTION_BUSY.getMessage()); + } + + /** 释放锁(用户主动终止)。 */ + private void releaseLockSelf(String reason) { + DetectionLockManager.getInstance().releaseIfHeldBy(RequestUtil.getUserId(), reason); + } + + /** 解析展示给前端的用户名(昵称优先,loginName 兜底,避免 BUSY 弹窗显示 "unknown user")。 */ + private String resolveDisplayName(String userId) { + if (StrUtil.isBlank(userId)) { + return ""; + } + try { + SysUser user = sysUserService.getById(userId); + if (user != null && StrUtil.isNotBlank(user.getName())) { + return user.getName(); + } + if (user != null && StrUtil.isNotBlank(user.getLoginName())) { + return user.getLoginName(); + } + } catch (Exception e) { + log.warn("解析检测锁持有者昵称失败,userId={}", userId, e); + } + // 最终兜底:用 token 里的 loginName,不要返回 "unknown user" + String loginName = RequestUtil.getLoginNameByToken(); + return StrUtil.isNotBlank(loginName) ? loginName : userId; + } } diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index a1c366b7..6573d66f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.njcn.gather.detection.lock.DetectionLockManager; import com.njcn.gather.detection.pojo.dto.DevXiNumData; import com.njcn.gather.detection.pojo.enums.DetectionCodeEnum; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; @@ -1379,6 +1380,9 @@ public class SocketDevResponseService { iPqDevService.updateResult(param.getDevIds(), valueType, param.getCode(), param.getUserId(), param.getTemperature(), param.getHumidity(), true); CnSocketUtil.quitSend(param); + // 数模式检测全部小项完成 → 释放锁,避免用户必须点"停止"才能让出 + DetectionLockManager.getInstance() + .releaseIfMatchPage(param.getUserPageId(), "DEV_TEST_FINISHED"); } successComm.clear(); FormalTestManager.realDataXiList.clear(); diff --git a/detection/src/main/java/com/njcn/gather/detection/lock/DetectionLock.java b/detection/src/main/java/com/njcn/gather/detection/lock/DetectionLock.java new file mode 100644 index 00000000..2467d6a9 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/lock/DetectionLock.java @@ -0,0 +1,29 @@ +package com.njcn.gather.detection.lock; + +/** + * 检测互斥锁对象(不可变)。 + * 字段含义见 docs/superpowers/specs/2026-05-28-单用户检测互斥-design.md §2.1 + */ +public final class DetectionLock { + + private final String userId; + private final String userName; + private final String userPageId; + private final long acquireTime; + private final long expireAt; + + public DetectionLock(String userId, String userName, String userPageId, + long acquireTime, long expireAt) { + this.userId = userId; + this.userName = userName; + this.userPageId = userPageId; + this.acquireTime = acquireTime; + this.expireAt = expireAt; + } + + public String getUserId() { return userId; } + public String getUserName() { return userName; } + public String getUserPageId() { return userPageId; } + public long getAcquireTime() { return acquireTime; } + public long getExpireAt() { return expireAt; } +} diff --git a/detection/src/main/java/com/njcn/gather/detection/lock/DetectionLockManager.java b/detection/src/main/java/com/njcn/gather/detection/lock/DetectionLockManager.java new file mode 100644 index 00000000..a6cce9b1 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/lock/DetectionLockManager.java @@ -0,0 +1,134 @@ +package com.njcn.gather.detection.lock; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.njcn.gather.detection.pojo.vo.DetectionLockHolderVO; + +/** + * 检测互斥锁管理器(进程内单例)。 + * 详细设计:docs/superpowers/specs/2026-05-28-单用户检测互斥-design.md + */ +@Slf4j +public final class DetectionLockManager { + + private static final long LOCK_MAX_HOLD_MS = TimeUnit.HOURS.toMillis(4); + + private static final DetectionLockManager INSTANCE = new DetectionLockManager(); + + public static DetectionLockManager getInstance() { + return INSTANCE; + } + + private final AtomicReference current = new AtomicReference<>(null); + + private DetectionLockManager() {} + + /** 抢锁。同账号视为重入(刷新 page/expireAt)。 */ + public AcquireResult tryAcquire(String userId, String userName, String userPageId) { + for (int attempt = 0; attempt < 2; attempt++) { + DetectionLock cur = current.get(); + long now = System.currentTimeMillis(); + // 空闲 或 绝对超时已过 → 直接抢 + if (cur == null || now > cur.getExpireAt()) { + DetectionLock fresh = new DetectionLock(userId, userName, userPageId, now, now + LOCK_MAX_HOLD_MS); + if (current.compareAndSet(cur, fresh)) { + log.info("DetectionLock acquired by userId={}, userName={}, userPageId={}", userId, userName, userPageId); + return AcquireResult.ok(); + } + continue; // CAS 失败重试 + } + // 同账号重入 → 刷新 + if (userId.equals(cur.getUserId())) { + DetectionLock refreshed = new DetectionLock(userId, userName, userPageId, now, now + LOCK_MAX_HOLD_MS); + if (current.compareAndSet(cur, refreshed)) { + log.debug("DetectionLock reentered by userId={}, new userPageId={}", userId, userPageId); + return AcquireResult.ok(); + } + continue; + } + // 被他人持有 + return AcquireResult.busy(toHolderVO(cur)); + } + // 两次 CAS 都失败属于罕见高并发场景:绝不返回 ok()(那样会让调用方误以为持锁)。 + // 返回 busy,data 可能为 null(锁刚被释放);调用方/前端按"请重试"处理。 + DetectionLock cur = current.get(); + return AcquireResult.busy(cur == null ? null : toHolderVO(cur)); + } + + /** 仅当 holder.userId == userId 才释放(幂等)。 + * 循环终止性:每轮 CAS 失败意味着 current 被其他线程改写; + * 下一轮 get 后 cur 可能变成 null 或不再匹配 userId,命中前置 return 退出。 + * 唯一可能继续的情况是另一线程把它换成了同 userId 的新 lock,下一轮 CAS 会再次尝试; + * 最坏情况下 CAS 成功,仍然终止。 */ + public void releaseIfHeldBy(String userId, String reason) { + while (true) { + DetectionLock cur = current.get(); + if (cur == null || !cur.getUserId().equals(userId)) return; + if (current.compareAndSet(cur, null)) { + log.info("DetectionLock released, reason={}, userId={}", reason, userId); + return; + } + } + } + + /** 仅当 holder.userPageId == userPageId 才释放(幂等)。终止性同 releaseIfHeldBy。 */ + public void releaseIfMatchPage(String userPageId, String reason) { + while (true) { + DetectionLock cur = current.get(); + if (cur == null || !cur.getUserPageId().equals(userPageId)) return; + if (current.compareAndSet(cur, null)) { + log.info("DetectionLock released, reason={}, userPageId={}", reason, userPageId); + return; + } + } + } + + /** 管理员强制释放,不校验 holder。 */ + public void forceRelease(String operatorUserId, String reason) { + DetectionLock cur = current.getAndSet(null); + if (cur != null) { + log.warn("DetectionLock force-released by operator={}, victim userId={}, reason={}", + operatorUserId, cur.getUserId(), reason); + } + } + + /** 返回当前 holder 快照;返回 null 表示空闲。 */ + public DetectionLock getCurrent() { + DetectionLock cur = current.get(); + // 顺手做惰性超时回收(spec R5) + if (cur != null && System.currentTimeMillis() > cur.getExpireAt()) { + current.compareAndSet(cur, null); + return null; + } + return cur; + } + + /** 把 DetectionLock 转成给前端的 VO。 */ + public static DetectionLockHolderVO toHolderVO(DetectionLock lock) { + DetectionLockHolderVO vo = new DetectionLockHolderVO(); + vo.setHolderUserId(lock.getUserId()); + vo.setHolderUserName(lock.getUserName()); + vo.setAcquireTime(new Date(lock.getAcquireTime())); + vo.setExpireAt(new Date(lock.getExpireAt())); + return vo; + } + + /** 抢锁结果。 */ + public static final class AcquireResult { + private final boolean ok; + private final DetectionLockHolderVO holder; + + private AcquireResult(boolean ok, DetectionLockHolderVO holder) { + this.ok = ok; + this.holder = holder; + } + public static AcquireResult ok() { return new AcquireResult(true, null); } + public static AcquireResult busy(DetectionLockHolderVO holder) { return new AcquireResult(false, holder); } + public boolean isOk() { return ok; } + public DetectionLockHolderVO getHolder() { return holder; } + } +} diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/enums/DetectionResponseEnum.java b/detection/src/main/java/com/njcn/gather/detection/pojo/enums/DetectionResponseEnum.java index 64b4ceb9..c9a8995f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/pojo/enums/DetectionResponseEnum.java +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/enums/DetectionResponseEnum.java @@ -19,7 +19,8 @@ public enum DetectionResponseEnum { SCRIPT_CHECK_DATA_NOT_EXIST("A020040","测试脚本项暂无配置" ), - EXCEED_MAX_TIME("A020041","检测次数超出最大限制!" ); + EXCEED_MAX_TIME("A020041","检测次数超出最大限制!" ), + DETECTION_BUSY("A020042", "检测进行中"); private final String code; private final String message; diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/vo/DetectionLockHolderVO.java b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/DetectionLockHolderVO.java new file mode 100644 index 00000000..b31a3771 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/DetectionLockHolderVO.java @@ -0,0 +1,22 @@ +package com.njcn.gather.detection.pojo.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.util.Date; + +/** + * 检测锁持有者信息,用于在抢锁失败响应中返回给前端。 + */ +@Data +public class DetectionLockHolderVO { + + private String holderUserId; + private String holderUserName; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date acquireTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date expireAt; +} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index eb74ccb3..107f303b 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON; import com.njcn.gather.detection.handler.SocketContrastResponseService; import com.njcn.gather.detection.handler.SocketDevResponseService; import com.njcn.gather.detection.handler.SocketSourceResponseService; +import com.njcn.gather.detection.lock.DetectionLockManager; import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; import com.njcn.gather.detection.pojo.param.ContrastDetectionParam; import com.njcn.gather.detection.pojo.param.PreDetectionParam; @@ -26,6 +27,8 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -42,6 +45,20 @@ import java.util.concurrent.TimeUnit; @Component public class NettyClient { + // ========== TODO TEST-ONLY: 联调 BUSY 弹窗用,测试完成后整段删除 ========== + /** + * 测试期:连接源/设备失败时,延迟若干秒再释放检测锁,方便手动测试 BUSY 弹窗。 + * 正式上线时把 RELEASE_DELAY_FOR_TEST_SECONDS 改回 0 或直接删除调度逻辑。 + */ + private static final long RELEASE_DELAY_FOR_TEST_SECONDS = 300L; + private static final ScheduledExecutorService DELAYED_RELEASER = + Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "lock-release-delay-test"); + t.setDaemon(true); + return t; + }); + // ========== /TODO TEST-ONLY ========== + @Resource private SocketSourceResponseService socketSourceResponseService; @@ -349,7 +366,7 @@ public class NettyClient { NioEventLoopGroup group, String msg) { channelFuture.addListener((ChannelFutureListener) ch -> { if (!ch.isSuccess()) { - onConnectionFailure(handler, group); + onConnectionFailure(param, handler, group); } else { onConnectionSuccess(channelFuture, param, handler, group, msg); } @@ -358,15 +375,30 @@ public class NettyClient { /** * 连接失败处理 - * 输出失败信息并优雅关闭事件循环组 + * 输出失败信息、优雅关闭事件循环组、通知前端、释放检测锁 * + * @param param 检测参数,用于定位锁与前端通知 * @param handler 业务处理器,用于区分设备类型 * @param group 事件循环组 */ - private static void onConnectionFailure(SimpleChannelInboundHandler handler, NioEventLoopGroup group) { + private static void onConnectionFailure(PreDetectionParam param, + SimpleChannelInboundHandler handler, NioEventLoopGroup group) { String deviceType = getDeviceType(handler); log.info("连接{}服务端失败...", deviceType); group.shutdownGracefully(); + // 异步建连失败时前端原本静默,补一次通知避免用户黑屏等待 + notifyFrontendError(param, handler); + // 释放检测锁:抢锁后由 controller 异步发起的建连若失败,无法走 controller 兜底 + // TODO TEST-ONLY: 测试 BUSY 弹窗期间延迟释放,正式部署改回立即释放 + if (param != null && param.getUserPageId() != null) { + final String userPageId = param.getUserPageId(); + DELAYED_RELEASER.schedule( + () -> DetectionLockManager.getInstance() + .releaseIfMatchPage(userPageId, "ASYNC_CONNECT_FAILED_DELAYED"), + RELEASE_DELAY_FOR_TEST_SECONDS, TimeUnit.SECONDS); + log.warn("[TEST-ONLY] 检测锁将在 {}s 后释放(连接失败延迟释放,便于测试 BUSY 弹窗),userPageId={}", + RELEASE_DELAY_FOR_TEST_SECONDS, userPageId); + } } /** @@ -455,6 +487,18 @@ public class NettyClient { // 通过WebSocket通知前端页面 notifyFrontendError(param, handler); + + // 释放检测锁,理由同 onConnectionFailure + // TODO TEST-ONLY: 测试 BUSY 弹窗期间延迟释放,正式部署改回立即释放 + if (param != null && param.getUserPageId() != null) { + final String userPageId = param.getUserPageId(); + DELAYED_RELEASER.schedule( + () -> DetectionLockManager.getInstance() + .releaseIfMatchPage(userPageId, "ASYNC_CONNECT_EXCEPTION_DELAYED"), + RELEASE_DELAY_FOR_TEST_SECONDS, TimeUnit.SECONDS); + log.warn("[TEST-ONLY] 检测锁将在 {}s 后释放(连接异常延迟释放,便于测试 BUSY 弹窗),userPageId={}", + RELEASE_DELAY_FOR_TEST_SECONDS, userPageId); + } } /** diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyContrastClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyContrastClientHandler.java index f7f1dac1..f2ff6ca7 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyContrastClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyContrastClientHandler.java @@ -2,6 +2,7 @@ package com.njcn.gather.detection.util.socket.cilent; import cn.hutool.core.util.ObjectUtil; import com.njcn.gather.detection.handler.SocketContrastResponseService; +import com.njcn.gather.detection.lock.DetectionLockManager; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; @@ -68,6 +69,11 @@ public class NettyContrastClientHandler extends SimpleChannelInboundHandler { ctx.close(); SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG); CnSocketUtil.quitSendSource(param); + // 设备主动断开 → 本次检测视为结束,释放检测锁 + DetectionLockManager.getInstance() + .releaseIfMatchPage(param.getUserPageId(), "DEV_CHANNEL_INACTIVE"); } /** @@ -132,6 +137,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { } catch (Exception e) { log.error("处理服务端消息异常", e); CnSocketUtil.quitSend(param); + // 业务消息处理异常 → 退出并释放检测锁 + DetectionLockManager.getInstance() + .releaseIfMatchPage(param.getUserPageId(), "DEV_READ_EXCEPTION"); } } @@ -187,6 +195,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { CnSocketUtil.quitSendSource(param); socketResponseService.backCheckState(param); ctx.close(); + // 通道异常 → 释放检测锁 + DetectionLockManager.getInstance() + .releaseIfMatchPage(param.getUserPageId(), "DEV_EXCEPTION"); } /** @@ -231,6 +242,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { CnSocketUtil.quitSend(param); WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.SOCKET_TIMEOUT); socketResponseService.backCheckState(param); + // 常规步骤读超时兜底 → 释放检测锁 + DetectionLockManager.getInstance() + .releaseIfMatchPage(param.getUserPageId(), "DEV_READ_TIMEOUT"); } } @@ -244,6 +258,14 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { if (FormalTestManager.stopTime > STOP_TIMEOUT) { CnSocketUtil.quitSend(param); WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.FORMAL_REAL.getValue(), SourceOperateCodeEnum.STOP_TIMEOUT); + // R4 释放:暂停 10 分钟超时视同放弃本次检测 + DetectionLock cur = DetectionLockManager.getInstance().getCurrent(); + if (cur != null) { + DetectionLockManager.getInstance().releaseIfHeldBy(cur.getUserId(), "PAUSE_TIMEOUT"); + } + // 重置 FormalTestManager 状态,避免下次进入误判 + FormalTestManager.stopTime = 0; + FormalTestManager.hasStopFlag = false; } } @@ -302,6 +324,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { CnSocketUtil.quitSend(param); timeoutSend(sourceIssue); socketResponseService.backCheckState(param); + // 单项检测超时本质等于整轮中止(已 quitSend),释放检测锁 + DetectionLockManager.getInstance() + .releaseIfMatchPage(param.getUserPageId(), "DEV_ITEM_TIMEOUT"); } /** diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java index 75c88a0c..66479aea 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java @@ -1,6 +1,7 @@ package com.njcn.gather.detection.util.socket.cilent; import com.njcn.gather.detection.handler.SocketSourceResponseService; +import com.njcn.gather.detection.lock.DetectionLockManager; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.util.socket.CnSocketUtil; @@ -70,6 +71,9 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler覆盖业务回调里所有走 {@code sendDetectionErrorMessage} 的失败路径, + * 等价于在 detection/handler 全目录的错误终态点显式释放。与各 Netty handler + * 内的显式释放幂等叠加,形成双保险。

+ * + *

注:业务"正常完成"路径不走此方法(数模式 formalDeal 已在 Phase 1 显式释放; + * 比对模式正常完成走 sendMsg 推 ERROR_FLOW_END,依赖 WS 断开后心跳超时兜底)。

+ */ + private static void releaseLockOnTerminal(String userId, SourceOperateCodeEnum errorType) { + if (userId == null || userId.isEmpty()) { + return; + } + DetectionLockManager.getInstance() + .releaseIfMatchPage(userId, "WS_ERROR_PUSH:" + errorType.name()); } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java index a7ed7dcb..ff472c39 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java @@ -1,6 +1,7 @@ package com.njcn.gather.detection.util.socket.websocket; import cn.hutool.core.util.ObjectUtil; +import com.njcn.gather.detection.lock.DetectionLockManager; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.util.socket.CnSocketUtil; @@ -414,6 +415,11 @@ public class WebSocketHandler extends SimpleChannelInboundHandler