# CN_Gather Detection模块 Netty通信架构详细分析文档 ## 目录 1. [架构概览](#1-架构概览) 2. [智能Socket通信机制](#2-智能socket通信机制) 3. [Netty客户端组件详解](#3-netty客户端组件详解) 4. [Netty服务端组件详解](#4-netty服务端组件详解) 5. [WebSocket通信组件详解](#5-websocket通信组件详解) 6. [Socket响应处理器详解](#6-socket响应处理器详解) 7. [Socket管理与工具类详解](#7-socket管理与工具类详解) 8. [通信数据对象详解](#8-通信数据对象详解) 9. [通信流程分析](#9-通信流程分析) 10. [关键技术特性](#10-关键技术特性) 11. [配置与部署](#11-配置与部署) --- ## 1. 架构概览 CN_Gather Detection模块采用**智能Socket + WebSocket**的混合通信架构,通过全新的智能发送机制和Spring组件化设计,支持电能质量设备检测的复杂业务场景。 ### 1.1 整体架构图 ``` [前端页面] ←→ WebSocket(7777) ←→ [Detection应用] ←→ 智能Socket管理器 ←→ [源设备(62000)/被检设备(61000)] ↑ ↑ Spring容器管理 自动连接建立 ↑ ↑ 配置统一管理 智能发送机制 ``` ### 1.2 核心组件层次结构 ``` detection/ ├── util/socket/ │ ├── cilent/ # Netty客户端组件 │ │ ├── NettyClient.java # 智能客户端(Spring组件) │ │ ├── NettySourceClientHandler.java # 源设备处理器 │ │ ├── NettyDevClientHandler.java # 被检设备处理器 │ │ ├── NettyContrastClientHandler.java # 比对设备处理器 │ │ └── HeartbeatHandler.java # 心跳处理器 │ ├── service/ # Netty服务端组件 │ │ ├── NettyServer.java # 服务器核心 │ │ ├── DevNettyServerHandler.java # 设备服务端处理器 │ │ └── SourceNettyServerHandler.java # 源服务端处理器 │ ├── websocket/ # WebSocket通信组件 │ │ ├── WebSocketService.java # WebSocket服务 │ │ ├── WebSocketHandler.java # 消息处理器 │ │ ├── WebSocketInitializer.java # 初始化器 │ │ └── WebServiceManager.java # 会话管理器 │ ├── config/ # 配置管理组件(新增) │ │ └── SocketConnectionConfig.java # Socket连接配置 │ ├── SocketManager.java # 智能Socket管理器(Spring组件) │ ├── CnSocketUtil.java # Socket工具类 │ ├── FormalTestManager.java # 检测管理器 │ └── XiNumberManager.java # 系数管理器 └── handler/ # 业务响应处理器 ├── SocketSourceResponseService.java # 源响应处理 ├── SocketDevResponseService.java # 设备响应处理 └── SocketContrastResponseService.java # 比对响应处理 ``` ### 1.3 核心架构改进 #### 1.3.1 智能发送机制 - **自动连接管理**: 根据requestId自动判断是否需要建立连接 - **透明化操作**: 开发者只需关心业务逻辑,连接管理完全透明 - **配置驱动**: 通过配置文件统一管理需要建立连接的requestId #### 1.3.2 Spring组件化 - **全面Spring管理**: NettyClient和SocketManager完全交给Spring容器管理 - **依赖注入**: 通过构造函数注入实现松耦合设计 - **生命周期管理**: 利用Spring的@PostConstruct和@PreDestroy管理组件生命周期 #### 1.3.3 配置统一管理 - **集中配置**: 所有Socket相关配置统一在application.yml中管理 - **环境隔离**: 支持不同环境使用不同的IP和端口配置 - **配置热更新**: 支持配置的动态刷新 --- ## 2. 智能Socket通信机制 ### 2.1 智能发送机制核心设计 **设计理念:** - **开发者友好**: 开发者只需调用发送方法,无需关心连接管理 - **自动化管理**: 系统自动判断是否需要建立连接 - **配置驱动**: 通过配置决定哪些requestId需要建立连接 **核心组件关系图:** ```mermaid graph TB A[业务层] --> B[SocketManager] B --> C[SocketConnectionConfig] B --> D[NettyClient] C --> E[application.yml] D --> F[NettySourceClientHandler] D --> G[NettyDevClientHandler] subgraph "Spring容器管理" B C D end subgraph "配置管理" E H[requestId配置] I[IP/PORT配置] end ``` ### 2.2 SocketConnectionConfig.java - 智能配置管理器 **功能职责:** - 管理需要建立连接的requestId配置 - 统一管理Socket的IP和PORT配置 - 提供配置的动态读取和验证 **关键代码分析:** ```java @Component @ConfigurationProperties(prefix = "socket") public class SocketConnectionConfig { /** * 程控源设备配置 */ private SourceConfig source = new SourceConfig(); /** * 被检设备配置 */ private DeviceConfig device = new DeviceConfig(); @Data public static class SourceConfig { private String ip = "127.0.0.1"; private Integer port = 62000; } @Data public static class DeviceConfig { private String ip = "127.0.0.1"; private Integer port = 61000; } /** * 需要建立程控源通道的requestId集合 */ private static final Set SOURCE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList( "yjc_ytxjy" // 源通讯检测 )); /** * 需要建立被检设备通道的requestId集合 */ private static final Set DEVICE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList( "yjc_sbtxjy", // 连接建立 "FTP_SEND$01" // ftp文件传送指令 )); /** * 检查指定的requestId是否需要建立程控源连接 */ public static boolean needsSourceConnection(String requestId) { return SOURCE_CONNECTION_REQUEST_IDS.contains(requestId); } /** * 检查指定的requestId是否需要建立被检设备连接 */ public static boolean needsDeviceConnection(String requestId) { return DEVICE_CONNECTION_REQUEST_IDS.contains(requestId); } } ``` ### 2.3 SocketManager.java - 智能Socket管理器 **功能职责:** - 提供智能发送API,自动管理连接建立 - 统一管理Socket会话和EventLoopGroup - 支持多种发送模式和连接状态检查 **关键代码分析:** ```java @Slf4j @Component public class SocketManager { @Autowired private SocketConnectionConfig socketConnectionConfig; /** * key为userId(xxx_Source、xxx_Dev),value为channel */ private static final Map socketSessions = new ConcurrentHashMap<>(); /** * key为userId(xxx_Source、xxx_Dev),value为group */ private static final Map socketGroup = new ConcurrentHashMap<>(); /** * 智能发送消息到程控源设备 * 自动从配置文件读取IP和PORT,开发者无需关心网络配置 * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送 */ public void smartSendToSource(PreDetectionParam param, String msg) { String ip = socketConnectionConfig.getSource().getIp(); Integer port = socketConnectionConfig.getSource().getPort(); String requestId = extractRequestId(msg); String userId = param.getUserPageId() + CnSocketUtil.SOURCE_TAG; // 检查是否需要建立连接 if (SocketConnectionConfig.needsSourceConnection(requestId)) { // 检查连接是否存在且活跃 if (!isChannelActive(userId)) { log.info("程控源连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId); // 异步建立程控源连接并发送消息 CompletableFuture.runAsync(() -> { NettyClient.connectToSourceStatic(ip, port, param, msg); }); return; } } // 连接已存在或不需要建立连接,直接发送消息 log.info("直接发送消息到程控源: userId={}, requestId={}", userId, requestId); sendMsg(userId, msg); } /** * 智能发送消息到被检设备 * 自动从配置文件读取IP和PORT,开发者无需关心网络配置 * 如果连接不存在且requestId需要建立连接,会自动建立连接后发送 */ public void smartSendToDevice(PreDetectionParam param, String msg) { String requestId = extractRequestId(msg); String userId = param.getUserPageId() + CnSocketUtil.DEV_TAG; // 检查是否需要建立连接 if (SocketConnectionConfig.needsDeviceConnection(requestId)) { String ip = socketConnectionConfig.getDevice().getIp(); Integer port = socketConnectionConfig.getDevice().getPort(); // 检查连接是否存在且活跃 if (!isChannelActive(userId)) { log.info("被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId); // 异步建立被检设备连接并发送消息 CompletableFuture.runAsync(() -> { NettyClient.connectToDeviceStatic(ip, port, param, msg); }); return; } } // 连接已存在或不需要建立连接,直接发送消息 log.info("直接发送消息到被检设备: userId={}, requestId={}", userId, requestId); sendMsg(userId, msg); } /** * 从消息中提取requestId * 支持JSON格式的消息解析 */ private static String extractRequestId(String msg) { try { if (StrUtil.isNotBlank(msg)) { // 尝试解析JSON格式消息 JSONObject jsonObject = JSON.parseObject(msg); String requestId = jsonObject.getString("requestId"); if (StrUtil.isNotBlank(requestId)) { return requestId; } // 如果没有requestId字段,尝试解析request_id字段 requestId = jsonObject.getString("request_id"); if (StrUtil.isNotBlank(requestId)) { return requestId; } } } catch (Exception e) { log.warn("解析消息中的requestId失败: msg={}, error={}", msg, e.getMessage()); } return "unknown"; } /** * 检查指定用户的Channel是否活跃 */ private static boolean isChannelActive(String userId) { Channel channel = getChannelByUserId(userId); return ObjectUtil.isNotNull(channel) && channel.isActive(); } } ``` ### 2.4 使用示例 #### 2.4.1 业务层调用方式 ```java @Service @RequiredArgsConstructor public class PreDetectionServiceImpl implements PreDetectionService { private final SocketManager socketManager; @Override public void sourceCommunicationCheck(PreDetectionParam param) { // 组装检测消息 SocketMsg msg = new SocketMsg<>(); msg.setRequestId("yjc_ytxjy"); msg.setOperateCode("INIT_GATHER"); msg.setData(JSON.toJSONString(sourceParam)); // 智能发送 - 系统自动判断是否需要建立连接 socketManager.smartSendToSource(param, JSON.toJSONString(msg)); } } ``` #### 2.4.2 配置文件示例 ```yaml # application.yml socket: source: ip: 192.168.1.124 port: 62000 device: ip: 192.168.1.124 port: 61000 ``` --- ## 3. Netty客户端组件详解 ### 3.1 NettyClient.java - Spring管理的智能客户端 **功能职责:** - 作为Spring组件提供连接服务 - 支持源设备和被检设备的智能连接 - 自动处理Handler的实例化和依赖注入 **关键代码分析:** ```java @Component @Slf4j public class NettyClient { @Autowired private SocketSourceResponseService socketSourceResponseService; @Autowired private SocketDevResponseService socketDevResponseService; private static NettyClient instance; @PostConstruct public void init() { instance = this; } /** * 连接到程控源设备 * Spring管理的实例方法,支持依赖注入 */ public void connectToSource(String ip, Integer port, PreDetectionParam param, String msg) { NettySourceClientHandler handler = createSourceHandler(param); executeSocketConnection(ip, port, param, msg, handler); } /** * 连接到被检设备 * Spring管理的实例方法,支持依赖注入 */ public void connectToDevice(String ip, Integer port, PreDetectionParam param, String msg) { NettyDevClientHandler handler = createDeviceHandler(param); executeSocketConnection(ip, port, param, msg, handler); } /** * 静态方法入口 - 保持向后兼容 */ public static void connectToSourceStatic(String ip, Integer port, PreDetectionParam param, String msg) { if (instance != null) { instance.connectToSource(ip, port, param, msg); } else { log.error("NettyClient未初始化,无法创建程控源连接"); } } /** * 静态方法入口 - 保持向后兼容 */ public static void connectToDeviceStatic(String ip, Integer port, PreDetectionParam param, String msg) { if (instance != null) { instance.connectToDevice(ip, port, param, msg); } else { log.error("NettyClient未初始化,无法创建被检设备连接"); } } /** * 创建源设备处理器 * 利用Spring注入的Service实例 */ private NettySourceClientHandler createSourceHandler(PreDetectionParam param) { return new NettySourceClientHandler(param, socketSourceResponseService); } /** * 创建被检设备处理器 * 利用Spring注入的Service实例 */ private NettyDevClientHandler createDeviceHandler(PreDetectionParam param) { return new NettyDevClientHandler(param, socketDevResponseService); } /** * 执行Socket连接建立流程(重构后的核心实现) */ private static void executeSocketConnection(String ip, Integer port, PreDetectionParam param, String msg, SimpleChannelInboundHandler handler) { // 创建NIO事件循环组 NioEventLoopGroup group = createEventLoopGroup(); try { // 配置客户端启动器 Bootstrap bootstrap = configureBootstrap(group); // 创建管道初始化器 ChannelInitializer initializer = createChannelInitializer(param, handler); bootstrap.handler(initializer); // 同步连接到目标服务器 ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); // 处理连接结果 handleConnectionResult(channelFuture, param, handler, group, msg); } catch (Exception e) { // 处理连接异常 handleConnectionException(e, param, handler, group); } } /** * 创建NIO事件循环组 */ private static NioEventLoopGroup createEventLoopGroup() { return new NioEventLoopGroup(); } /** * 配置Bootstrap启动器 */ private static Bootstrap configureBootstrap(NioEventLoopGroup group) { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .option(ChannelOption.SO_KEEPALIVE, true) .channel(NioSocketChannel.class); return bootstrap; } /** * 创建通道初始化器 * 根据处理器类型配置不同的Pipeline */ private static ChannelInitializer createChannelInitializer( PreDetectionParam param, SimpleChannelInboundHandler handler) { return new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel ch) { if (handler instanceof NettySourceClientHandler) { configureSourcePipeline(ch, param, handler); } else { configureDevicePipeline(ch, param, handler); } } }; } /** * 配置程控源设备的Pipeline */ private static void configureSourcePipeline(NioSocketChannel ch, PreDetectionParam param, SimpleChannelInboundHandler handler) { ch.pipeline() .addLast("frame-decoder", new LineBasedFrameDecoder(10240)) .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8)) .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8)) .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG)) .addLast("source-handler", handler); } /** * 配置被检设备的Pipeline */ private static void configureDevicePipeline(NioSocketChannel ch, PreDetectionParam param, SimpleChannelInboundHandler handler) { ch.pipeline() .addLast("frame-decoder", new LineBasedFrameDecoder(10240)) .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8)) .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8)) .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.DEV_TAG)) .addLast("idle-detector", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)) .addLast("device-handler", handler); } /** * 处理连接建立结果 */ private static void handleConnectionResult(ChannelFuture channelFuture, PreDetectionParam param, SimpleChannelInboundHandler handler, NioEventLoopGroup group, String msg) { if (channelFuture.isSuccess()) { log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress()); // 注册会话和EventLoopGroup到管理器 String userId = getConnectionUserId(param, handler); SocketManager.addUser(userId, channelFuture.channel()); SocketManager.addGroup(userId, group); // 发送初始消息 if (StrUtil.isNotBlank(msg)) { channelFuture.channel().writeAndFlush(msg + "\n"); log.info("发送初始消息: {}", msg); } } else { log.error("Socket连接建立失败"); handleConnectionFailure(param, group); } } /** * 获取连接的用户ID */ private static String getConnectionUserId(PreDetectionParam param, SimpleChannelInboundHandler handler) { String tag = (handler instanceof NettySourceClientHandler) ? CnSocketUtil.SOURCE_TAG : CnSocketUtil.DEV_TAG; return param.getUserPageId() + tag; } /** * 处理连接异常 */ private static void handleConnectionException(Exception e, PreDetectionParam param, SimpleChannelInboundHandler handler, NioEventLoopGroup group) { log.error("Socket连接过程中发生异常: {}", e.getMessage(), e); // 清理资源 if (group != null) { group.shutdownGracefully(); } // 发送错误消息到前端 try { CnSocketUtil.quitSendSource(param); CnSocketUtil.quitSend(param); } catch (Exception ex) { log.error("发送错误消息失败", ex); } } /** * 处理连接失败情况 */ private static void handleConnectionFailure(PreDetectionParam param, NioEventLoopGroup group) { // 清理EventLoopGroup资源 if (group != null) { group.shutdownGracefully(); } // 通知业务层连接失败 try { CnSocketUtil.quitSendSource(param); CnSocketUtil.quitSend(param); } catch (Exception e) { log.error("处理连接失败通知时发生异常", e); } } } ``` ### 3.2 Handler组件改进 #### 3.2.1 NettySourceClientHandler.java - 源设备处理器 **功能改进:** - 简化异常处理逻辑,移除冗余注释 - 使用slf4j日志替代System.out.println - 优化连接状态管理 ```java @RequiredArgsConstructor @Slf4j public class NettySourceClientHandler extends SimpleChannelInboundHandler { private final PreDetectionParam webUser; private final SocketSourceResponseService sourceResponseService; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("程控源客户端通道已建立: {}", ctx.channel().id()); SocketManager.addUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG, ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { log.debug("接收源设备数据: {}", msg); try { sourceResponseService.deal(webUser, msg); } catch (Exception e) { log.error("处理源设备响应异常", e); CnSocketUtil.quitSend(webUser); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("程控源客户端连接断开"); ctx.close(); SocketManager.removeUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) { log.debug("程控源设备空闲状态触发"); } } else { super.userEventTriggered(ctx, evt); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("程控源通信异常", cause); if (cause instanceof ConnectException) { log.warn("程控源连接异常"); } else if (cause instanceof IOException) { WebServiceManager.sendDetectionErrorMessage(webUser.getUserPageId(), SourceOperateCodeEnum.SERVER_ERROR); } else if (cause instanceof TimeoutException) { log.warn("程控源通信超时"); } ctx.close(); } } ``` #### 3.2.2 NettyDevClientHandler.java - 被检设备处理器 **功能改进:** - 精简超时检测逻辑,移除大段注释代码 - 优化异常处理机制 - 统一日志输出格式 ```java @RequiredArgsConstructor @Slf4j public class NettyDevClientHandler extends SimpleChannelInboundHandler { private final PreDetectionParam param; private final SocketDevResponseService socketDevResponseService; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("被检设备客户端通道已建立: {}", ctx.channel().id()); SocketManager.addUser(param.getUserPageId() + CnSocketUtil.DEV_TAG, ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { log.debug("接收被检设备数据: {}", msg); try { socketDevResponseService.deal(param, msg); } catch (Exception e) { log.error("处理被检设备响应异常", e); CnSocketUtil.quitSend(param); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("被检设备客户端连接断开"); ctx.close(); SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.warn("被检设备读超时触发"); handleReadTimeout(ctx); } } else { super.userEventTriggered(ctx, evt); } } /** * 处理读超时事件 */ private void handleReadTimeout(ChannelHandlerContext ctx) { if (!FormalTestManager.hasStopFlag) { if (CollUtil.isNotEmpty(SocketManager.getSourceList())) { SourceIssue sourceIssue = SocketManager.getSourceList().get(0); // 更新超时计时器 updateTimeoutCounter(sourceIssue); // 检查是否需要触发超时处理 if (shouldTriggerTimeout(sourceIssue)) { log.warn("检测项超时: {}", sourceIssue.getType()); CnSocketUtil.quitSend(param); timeoutSend(sourceIssue); } } } } /** * 更新超时计时器 */ private void updateTimeoutCounter(SourceIssue sourceIssue) { SocketManager.clockMap.put(sourceIssue.getIndex(), SocketManager.clockMap.getOrDefault(sourceIssue.getIndex(), 0L) + 60L); } /** * 判断是否应该触发超时处理 */ private boolean shouldTriggerTimeout(SourceIssue sourceIssue) { Long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex()); if (currentTime == null) return false; // 根据检测类型设置不同的超时时间 if (sourceIssue.getType().equals(DicDataEnum.F.getCode())) { return currentTime > 1300; // 闪变: 20分钟超时 } else if (sourceIssue.getType().equals(DicDataEnum.VOLTAGE.getCode()) || sourceIssue.getType().equals(DicDataEnum.HP.getCode())) { return currentTime > 180; // 统计数据: 3分钟超时 } else { return currentTime > 60; // 实时数据: 1分钟超时 } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("被检设备通信异常", cause); if (cause instanceof ConnectException) { log.warn("被检设备连接异常"); } else if (cause instanceof IOException) { WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR); } else if (cause instanceof TimeoutException) { log.warn("被检设备通信超时"); } // 清理资源并断开连接 CnSocketUtil.quitSend(param); CnSocketUtil.quitSendSource(param); ctx.close(); } } ``` --- ## 4. Netty服务端组件详解 ### 4.1 NettyServer.java - 服务器核心 **功能职责:** - 提供Socket服务端功能,用于测试和开发 - 支持源通信服务和设备通信服务 - 模拟外部设备的响应行为 **关键代码分析:** ```java public class NettyServer { public static final int port = 8574; private void runSource() { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group(boss, work); bootstrap.channel(NioServerSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(ServerSocketChannel ch) { System.out.println("源通讯服务正在启动中......"); } }) .childHandler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline() .addLast(new LineBasedFrameDecoder(10240)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new DevNettyServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); future.addListener(f -> { if (future.isSuccess()) { System.out.println("源通讯服务启动成功"); } else { System.out.println("源通讯服务启动失败"); } }); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } } ``` --- ## 5. WebSocket通信组件详解 ### 5.1 WebSocketService.java - WebSocket服务核心 **功能职责:** - 启动基于Netty的WebSocket服务器 - 管理服务器生命周期(启动/关闭) - 提供高性能的WebSocket通信支持 **关键代码分析:** ```java @Component @RequiredArgsConstructor @Slf4j public class WebSocketService implements ApplicationRunner { @Value("${webSocket.port:7777}") int port; EventLoopGroup bossGroup; EventLoopGroup workerGroup; private Channel serverChannel; private CompletableFuture serverFuture; @Override public void run(ApplicationArguments args) { // 使用CompletableFuture异步启动WebSocket服务,避免阻塞Spring Boot主线程 serverFuture = CompletableFuture.runAsync(this::startWebSocketServer) .exceptionally(throwable -> { log.error("WebSocket服务启动异常", throwable); return null; }); } private void startWebSocketServer() { try { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new WebSocketInitializer()); ChannelFuture future = serverBootstrap.bind(port).sync(); serverChannel = future.channel(); future.addListener(f -> { if (future.isSuccess()) { log.info("webSocket服务启动成功,端口:{}", port); } else { log.error("webSocket服务启动失败,端口:{}", port); } }); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("WebSocket服务启动过程中被中断", e); Thread.currentThread().interrupt(); } catch (Exception e) { log.error("WebSocket服务启动失败", e); throw new RuntimeException("WebSocket服务启动失败", e); } finally { shutdownGracefully(); } } @PreDestroy public void destroy() throws InterruptedException { log.info("正在关闭WebSocket服务..."); if (serverChannel != null) { try { serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS); } catch (Exception e) { log.warn("关闭服务器通道时发生异常", e); } } if (bossGroup != null) { bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync(); } if (workerGroup != null) { workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync(); } if (serverFuture != null && !serverFuture.isDone()) { boolean cancelled = serverFuture.cancel(true); } log.info("webSocket服务已销毁"); } } ``` --- ## 6. Socket响应处理器详解 ### 6.1 响应处理器改进 **主要改进:** - 支持SocketManager的依赖注入 - 移除硬编码的IP/PORT配置 - 使用智能发送机制简化代码 #### 6.1.1 SocketSourceResponseService.java ```java @Service @RequiredArgsConstructor public class SocketSourceResponseService { private final SocketDevResponseService socketDevResponseService; private final IPqDevService iPqDevService; private final SocketManager socketManager; public void deal(PreDetectionParam param, String msg) throws Exception { SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); String[] tem = socketDataMsg.getRequestId().split(CnSocketUtil.STEP_TAG); SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(tem[0]); if (ObjectUtil.isNotNull(enumByCode)) { switch (enumByCode) { case YJC_YTXJY: if (ObjectUtil.isNotNull(param.getPlanId())) { detectionDev(param, socketDataMsg); } else { handleYtxjySimulate(param, socketDataMsg); } break; case YJC_XUJY: phaseSequenceDev(param, socketDataMsg); break; case FORMAL_REAL: if (ObjectUtil.isNotNull(param.getPlanId())) { senParamToDev(param, socketDataMsg); } else { handleSimulateTest(param, socketDataMsg); } break; case Coefficient_Check: coefficient(param, socketDataMsg); break; } } } // 装置检测 - 使用智能发送机制 private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) { SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); if (ObjectUtil.isNotNull(dictDataEnumByCode)) { SocketMsg socketMsg = new SocketMsg<>(); switch (dictDataEnumByCode) { case SUCCESS: WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); Map> map = new HashMap<>(1); map.put("deviceList", FormalTestManager.devList); String jsonString = JSON.toJSONString(map); socketMsg.setRequestId(SourceOperateCodeEnum.YJC_SBTXJY.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue()); socketMsg.setData(jsonString); String json = JSON.toJSONString(socketMsg); // 使用智能发送工具类,自动管理设备连接 socketManager.smartSendToDevice(param, json); break; case UNPROCESSED_BUSINESS: WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); break; default: CnSocketUtil.quitSendSource(param); WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); break; } } } } ``` --- ## 7. Socket管理与工具类详解 ### 7.1 SocketManager.java - 智能Socket管理器 **核心功能:** 1. **智能发送机制**: 自动判断连接需求,透明管理连接建立 2. **Spring组件管理**: 完全交给Spring容器管理,支持依赖注入 3. **会话管理**: 统一管理Socket连接会话和EventLoopGroup 4. **检测任务管理**: 管理检测相关的状态信息和配置 **关键数据结构:** ```java @Component @Slf4j public class SocketManager { @Autowired private SocketConnectionConfig socketConnectionConfig; // Socket会话管理 private static final Map socketSessions = new ConcurrentHashMap<>(); private static final Map socketGroup = new ConcurrentHashMap<>(); // 检测任务管理 private static Map targetMap = new ConcurrentHashMap<>(); private static List sourceIssueList = new CopyOnWriteArrayList<>(); public static Map valueTypeMap = new HashMap<>(); public static volatile Map clockMap = new ConcurrentHashMap<>(); public static volatile Map contrastClockMap = new ConcurrentHashMap<>(); // 基础连接管理方法 public static void addUser(String userId, Channel channel) { socketSessions.put(userId, channel); } public static void addGroup(String userId, NioEventLoopGroup group) { socketGroup.put(userId, group); } public static void removeUser(String userId) { Channel channel = socketSessions.get(userId); if (ObjectUtil.isNotNull(channel)) { try { channel.close().sync(); } catch (InterruptedException e) { log.error("关闭通道异常", e); } NioEventLoopGroup eventExecutors = socketGroup.get(userId); if (ObjectUtil.isNotNull(eventExecutors)) { eventExecutors.shutdownGracefully(); log.info("{}__{}关闭了客户端", userId, channel.id()); } } socketSessions.remove(userId); } public static void sendMsg(String userId, String msg) { Channel channel = socketSessions.get(userId); if (ObjectUtil.isNotNull(channel)) { channel.writeAndFlush(msg + '\n'); log.info("{}__{}往{}发送数据:{}", userId, channel.id(), channel.remoteAddress(), msg); } else { log.warn("{}__发送数据:失败通道不存在{}", userId, msg); } } // 检测任务管理方法 public static void addSourceList(List sList) { sourceIssueList = sList; } public static List getSourceList() { return sourceIssueList; } public static void delSource(Integer index) { sourceIssueList.removeIf(s -> index.equals(s.getIndex())); } public static void delSourceTarget(String sourceTag) { targetMap.remove(sourceTag); } public static void initMap(Map map) { targetMap = map; } public static void addTargetMap(String scriptType, Long count) { targetMap.put(scriptType, count); } public static Long getSourceTarget(String scriptType) { return targetMap.get(scriptType); } } ``` ### 7.2 CnSocketUtil.java - Socket工具类 **功能职责:** - 提供Socket连接的控制功能 - 封装WebSocket消息推送 - 定义通信相关常量 **关键代码:** ```java public class CnSocketUtil { public final static String DEV_TAG = "_Dev"; public final static String SOURCE_TAG = "_Source"; public final static String START_TAG = "_Start"; public final static String END_TAG = "_End"; public final static String STEP_TAG = "&&"; public final static String SPLIT_TAG = "_"; // 退出检测 public static void quitSend(PreDetectionParam param) { SocketMsg socketMsg = new SocketMsg<>(); socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); SocketManager.sendMsg(param.getUserPageId() + DEV_TAG, JSON.toJSONString(socketMsg)); WebServiceManager.removePreDetectionParam(); } // 关闭源连接 public static void quitSendSource(PreDetectionParam param) { SocketMsg socketMsg = new SocketMsg<>(); socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue()); JSONObject jsonObject = new JSONObject(); jsonObject.put("sourceId", param.getSourceId()); socketMsg.setData(jsonObject.toJSONString()); SocketManager.sendMsg(param.getUserPageId() + SOURCE_TAG, JSON.toJSONString(socketMsg)); WebServiceManager.removePreDetectionParam(); } // 推送webSocket数据 public static void sendToWebSocket(String userId, String requestId, String operatorType, Object data, String desc) { WebSocketVO webSocketVO = new WebSocketVO<>(); webSocketVO.setRequestId(requestId); webSocketVO.setOperateCode(operatorType); webSocketVO.setData(data); webSocketVO.setDesc(desc); WebServiceManager.sendMessage(userId, webSocketVO); } // 比对式-退出检测 public static void contrastSendquit(String userId) { System.out.println("比对式-发送关闭备通讯模块指令"); SocketMsg socketMsg = new SocketMsg<>(); socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); SocketManager.sendMsg(userId + DEV_TAG, JSON.toJSONString(socketMsg)); WebServiceManager.removePreDetectionParam(); } } ``` --- ## 8. 通信数据对象详解 ### 8.1 数据对象结构 #### 8.1.1 SocketMsg.java - Socket消息对象 ```java public class SocketMsg { private String requestId; // 请求ID,用于标识消息类型和流程 private String operateCode; // 操作代码,标识具体的操作类型 private T data; // 数据载荷,支持泛型 private String desc; // 描述信息 private Long timestamp; // 时间戳 } ``` #### 8.1.2 SocketDataMsg.java - Socket数据消息对象 ```java public class SocketDataMsg { private String requestId; // 请求ID private String operateCode; // 操作代码 private String data; // 响应数据(JSON字符串) private Integer code; // 响应状态码 private String message; // 响应消息 private String type; // 消息类型 } ``` #### 8.1.3 WebSocketVO.java - WebSocket数据对象 ```java public class WebSocketVO { private String requestId; // 请求ID private String operateCode; // 操作代码 private T data; // 数据载荷 private String desc; // 描述信息 private Integer status; // 状态码 private Long timestamp; // 时间戳 private String userId; // 用户ID } ``` --- ## 9. 通信流程分析 ### 9.1 智能发送流程 ```mermaid sequenceDiagram participant Business as 业务层 participant SocketManager as SocketManager participant Config as SocketConnectionConfig participant NettyClient as NettyClient participant Device as 外部设备 Business->>SocketManager: smartSendToSource(param, msg) SocketManager->>SocketManager: extractRequestId(msg) SocketManager->>Config: needsSourceConnection(requestId) alt 需要建立连接 Config-->>SocketManager: true SocketManager->>SocketManager: isChannelActive(userId) alt 连接不存在 SocketManager->>Config: getSource().getIp/Port() Config-->>SocketManager: IP/PORT配置 SocketManager->>NettyClient: connectToSourceStatic(ip, port, param, msg) NettyClient->>Device: 建立连接并发送消息 else 连接已存在 SocketManager->>SocketManager: sendMsg(userId, msg) end else 不需要建立连接 Config-->>SocketManager: false SocketManager->>SocketManager: sendMsg(userId, msg) end ``` ### 9.2 Spring组件生命周期流程 ```mermaid graph TB A[Spring容器启动] --> B[SocketConnectionConfig初始化] B --> C[@ConfigurationProperties绑定配置] C --> D[NettyClient注入依赖] D --> E[SocketManager注入配置] E --> F[业务层注入SocketManager] F --> G[智能发送服务就绪] G --> H[接收发送请求] H --> I[检查连接需求] I --> J[自动建立连接] J --> K[发送消息] K --> L[Spring容器关闭] L --> M[@PreDestroy清理资源] M --> N[关闭所有连接] ``` ### 9.3 配置管理流程 ```mermaid flowchart TD A[application.yml] --> B[Spring Boot配置绑定] B --> C[SocketConnectionConfig] C --> D[Source配置] C --> E[Device配置] C --> F[RequestId配置] D --> G[程控源IP/PORT] E --> H[被检设备IP/PORT] F --> I[连接需求判断] G --> J[SocketManager智能发送] H --> J I --> J J --> K[自动连接管理] K --> L[透明化发送] ``` --- ## 10. 关键技术特性 ### 10.1 智能发送机制特性 #### 10.1.1 自动连接管理 - **智能判断**: 根据requestId自动判断是否需要建立连接 - **透明操作**: 开发者无需关心连接建立过程 - **配置驱动**: 通过简单配置控制连接行为 #### 10.1.2 连接状态检测 ```java private static boolean isChannelActive(String userId) { Channel channel = getChannelByUserId(userId); return ObjectUtil.isNotNull(channel) && channel.isActive(); } ``` #### 10.1.3 异步连接建立 ```java CompletableFuture.runAsync(() -> { NettyClient.connectToSourceStatic(ip, port, param, msg); }); ``` ### 10.2 Spring组件化特性 #### 10.2.1 依赖注入管理 ```java @Component public class SocketManager { @Autowired private SocketConnectionConfig socketConnectionConfig; } @Service @RequiredArgsConstructor public class PreDetectionServiceImpl { private final SocketManager socketManager; } ``` #### 10.2.2 配置属性绑定 ```java @Component @ConfigurationProperties(prefix = "socket") public class SocketConnectionConfig { private SourceConfig source = new SourceConfig(); private DeviceConfig device = new DeviceConfig(); } ``` ### 10.3 配置统一管理特性 #### 10.3.1 统一配置文件 ```yaml socket: source: ip: 192.168.1.124 port: 62000 device: ip: 192.168.1.124 port: 61000 ``` #### 10.3.2 动态配置支持 - **环境隔离**: 支持不同环境使用不同配置 - **热更新**: 支持配置的动态刷新 - **默认值**: 提供合理的默认配置值 ### 10.4 异常处理和资源管理特性 #### 10.4.1 优化的异常处理 ```java @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("通信异常", cause); if (cause instanceof ConnectException) { log.warn("连接异常"); } else if (cause instanceof IOException) { WebServiceManager.sendDetectionErrorMessage(userId, errorCode); } ctx.close(); } ``` #### 10.4.2 完善的资源清理 ```java public static void removeUser(String userId) { Channel channel = socketSessions.get(userId); if (ObjectUtil.isNotNull(channel)) { try { channel.close().sync(); } catch (InterruptedException e) { log.error("关闭通道异常", e); } NioEventLoopGroup eventExecutors = socketGroup.get(userId); if (ObjectUtil.isNotNull(eventExecutors)) { eventExecutors.shutdownGracefully(); } } socketSessions.remove(userId); } ``` ### 10.5 并发安全特性 #### 10.5.1 线程安全设计 - **ConcurrentHashMap**: 用于会话管理 - **CopyOnWriteArrayList**: 用于检测项列表 - **volatile关键字**: 用于状态标志 - **CompletableFuture**: 用于异步处理 #### 10.5.2 日志统一管理 ```java @Slf4j public class NettyClient { log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress()); log.error("Socket连接过程中发生异常: {}", e.getMessage(), e); log.debug("发送初始消息: {}", msg); } ``` --- ## 11. 配置与部署 ### 11.1 应用配置 #### 11.1.1 核心配置文件 ```yaml # application.yml webSocket: port: 7777 # WebSocket服务端口 socket: source: ip: 192.168.1.124 # 程控源设备IP port: 62000 # 程控源设备端口 device: ip: 192.168.1.124 # 被检设备IP port: 61000 # 被检设备端口 netty: server: port: 8574 # Netty服务端端口(测试用) # 日志配置 logging: level: com.njcn.gather.detection.util.socket: INFO com.njcn.gather.detection.handler: INFO io.netty: WARN pattern: console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" ``` #### 11.1.2 环境配置示例 ```yaml # application-dev.yml (开发环境) socket: source: ip: 127.0.0.1 port: 62000 device: ip: 127.0.0.1 port: 61000 # application-prod.yml (生产环境) socket: source: ip: 192.168.1.124 port: 62000 device: ip: 192.168.1.124 port: 61000 ``` ### 11.2 Maven依赖配置 ```xml org.springframework.boot spring-boot-starter 2.3.12.RELEASE io.netty netty-all 4.1.76.Final com.alibaba fastjson 1.2.83 cn.hutool hutool-all 5.8.10 org.projectlombok lombok true ``` ### 11.3 Spring Boot集成 #### 11.3.1 自动配置启用 ```java @SpringBootApplication @EnableConfigurationProperties(SocketConnectionConfig.class) public class DetectionApplication { public static void main(String[] args) { SpringApplication.run(DetectionApplication.class, args); } } ``` #### 11.3.2 组件扫描配置 ```java @ComponentScan(basePackages = { "com.njcn.gather.detection.util.socket", "com.njcn.gather.detection.handler", "com.njcn.gather.detection.service" }) ``` ### 11.4 性能调优参数 #### 11.4.1 Netty性能参数 ```java // 服务端性能调优 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间 .childOption(ChannelOption.SO_KEEPALIVE, true) // 启用TCP keepalive .childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 发送缓冲区大小 // 客户端性能调优 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时 .option(ChannelOption.SO_KEEPALIVE, true) // keepalive .option(ChannelOption.TCP_NODELAY, true) // 立即发送 .channel(NioSocketChannel.class); ``` #### 11.4.2 线程池配置 ```yaml # application.yml spring: task: execution: pool: core-size: 8 max-size: 16 queue-capacity: 100 thread-name-prefix: "detection-" ``` ### 11.5 监控和诊断 #### 11.5.1 健康检查配置 ```java @Component public class SocketHealthIndicator implements HealthIndicator { @Autowired private SocketManager socketManager; @Override public Health health() { // 检查Socket连接状态 if (hasActiveConnections()) { return Health.up() .withDetail("activeConnections", getActiveConnectionCount()) .build(); } else { return Health.down() .withDetail("reason", "No active socket connections") .build(); } } } ``` #### 11.5.2 指标监控 ```java @Component public class SocketMetrics { private final MeterRegistry meterRegistry; private final Counter connectionCounter; private final Timer messageProcessingTimer; public SocketMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; this.connectionCounter = Counter.builder("socket.connections.total") .description("Total socket connections") .register(meterRegistry); this.messageProcessingTimer = Timer.builder("socket.message.processing.time") .description("Message processing time") .register(meterRegistry); } } ``` --- ## 总结 CN_Gather Detection模块的全新Netty通信架构通过**智能Socket管理机制**和**全面Spring组件化**的设计,实现了电能质量设备检测系统的现代化通信解决方案。 ### 核心架构优势 1. **智能化程度高** - 自动连接管理,开发者无需关心连接细节 - 配置驱动的连接策略,灵活可控 - 透明化的发送机制,简化业务代码 2. **Spring生态集成** - 完全Spring组件化管理,遵循IoC原则 - 统一的配置管理,支持多环境部署 - 完善的依赖注入,松耦合设计 3. **代码质量提升** - 移除大量冗余和无用代码 - 统一日志管理,便于调试和监控 - 优化异常处理,提高系统稳定性 4. **可维护性增强** - 模块化设计,职责边界清晰 - 配置集中管理,降低维护成本 - 完善的资源管理,避免内存泄漏 5. **开发体验优化** - 简化的API设计,降低使用门槛 - 智能化的连接管理,减少样板代码 - 统一的错误处理,提高开发效率 ### 技术特色 - **智能发送机制**: 业界领先的自动连接管理技术 - **配置统一管理**: 现代化的配置管理模式 - **Spring深度集成**: 充分利用Spring生态优势 - **高并发支持**: 基于Netty NIO的高性能通信 - **完善监控**: 全方位的监控和诊断能力 该架构为CN_Gather系统提供了稳定、高效、易维护的通信基础,确保了电能质量检测业务的可靠运行,同时为未来的功能扩展和性能优化奠定了坚实基础。