Files
CN_Gather/CN_Gather_Detection_Netty架构详细分析文档.md
hongawen 8caaf95427 ADD: 添加项目配置文档和开发指南
- 新增 CLAUDE.md 项目架构和开发指导文档
- 添加 Gitea本地协作开发服务器配置指南
- 完善检测模块架构分析文档
- 增加报告生成和Word文档处理工具指南
- 添加动态表格和结果服务测试用例
- 更新应用配置和VS Code开发环境设置

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-24 16:49:40 +08:00

55 KiB
Raw Blame History

CN_Gather Detection模块 Netty通信架构详细分析文档

目录

  1. 架构概览
  2. 智能Socket通信机制
  3. Netty客户端组件详解
  4. Netty服务端组件详解
  5. WebSocket通信组件详解
  6. Socket响应处理器详解
  7. Socket管理与工具类详解
  8. 通信数据对象详解
  9. 通信流程分析
  10. 关键技术特性
  11. 配置与部署

1. 架构概览

CN_Gather Detection模块采用智能Socket + WebSocket的混合通信架构通过全新的智能发送机制和Spring组件化设计支持电能质量设备检测的复杂业务场景。

1.1 整体架构图

[前端页面] ←→ WebSocket(7777) ←→ [Detection应用] ←→ 智能Socket管理器 ←→ [源设备(62000)/被检设备(61000)]
                                        ↑                    ↑
                                   Spring容器管理      自动连接建立
                                        ↑                    ↑
                                   配置统一管理         智能发送机制

1.2 核心组件层次结构

detection/
├── util/socket/
│   ├── cilent/                           # Netty客户端组件
│   │   ├── NettyClient.java             # 智能客户端(Spring组件)
│   │   ├── NettySourceClientHandler.java    # 源设备处理器
│   │   ├── NettyDevClientHandler.java       # 被检设备处理器
│   │   ├── NettyContrastClientHandler.java  # 比对设备处理器
│   │   └── HeartbeatHandler.java        # 心跳处理器
│   ├── service/                          # Netty服务端组件
│   │   ├── NettyServer.java             # 服务器核心
│   │   ├── DevNettyServerHandler.java   # 设备服务端处理器
│   │   └── SourceNettyServerHandler.java # 源服务端处理器
│   ├── websocket/                        # WebSocket通信组件
│   │   ├── WebSocketService.java        # WebSocket服务
│   │   ├── WebSocketHandler.java        # 消息处理器
│   │   ├── WebSocketInitializer.java    # 初始化器
│   │   └── WebServiceManager.java       # 会话管理器
│   ├── config/                           # 配置管理组件(新增)
│   │   └── SocketConnectionConfig.java  # Socket连接配置
│   ├── SocketManager.java               # 智能Socket管理器(Spring组件)
│   ├── CnSocketUtil.java                # Socket工具类
│   ├── FormalTestManager.java           # 检测管理器
│   └── XiNumberManager.java             # 系数管理器
└── handler/                              # 业务响应处理器
    ├── SocketSourceResponseService.java # 源响应处理
    ├── SocketDevResponseService.java    # 设备响应处理
    └── SocketContrastResponseService.java # 比对响应处理

1.3 核心架构改进

1.3.1 智能发送机制

  • 自动连接管理: 根据requestId自动判断是否需要建立连接
  • 透明化操作: 开发者只需关心业务逻辑,连接管理完全透明
  • 配置驱动: 通过配置文件统一管理需要建立连接的requestId

1.3.2 Spring组件化

  • 全面Spring管理: NettyClient和SocketManager完全交给Spring容器管理
  • 依赖注入: 通过构造函数注入实现松耦合设计
  • 生命周期管理: 利用Spring的@PostConstruct和@PreDestroy管理组件生命周期

1.3.3 配置统一管理

  • 集中配置: 所有Socket相关配置统一在application.yml中管理
  • 环境隔离: 支持不同环境使用不同的IP和端口配置
  • 配置热更新: 支持配置的动态刷新

2. 智能Socket通信机制

2.1 智能发送机制核心设计

设计理念:

  • 开发者友好: 开发者只需调用发送方法,无需关心连接管理
  • 自动化管理: 系统自动判断是否需要建立连接
  • 配置驱动: 通过配置决定哪些requestId需要建立连接

核心组件关系图:

graph TB
    A[业务层] --> B[SocketManager]
    B --> C[SocketConnectionConfig]
    B --> D[NettyClient]
    C --> E[application.yml]
    D --> F[NettySourceClientHandler]
    D --> G[NettyDevClientHandler]
    
    subgraph "Spring容器管理"
        B
        C
        D
    end
    
    subgraph "配置管理"
        E
        H[requestId配置]
        I[IP/PORT配置]
    end

2.2 SocketConnectionConfig.java - 智能配置管理器

功能职责:

  • 管理需要建立连接的requestId配置
  • 统一管理Socket的IP和PORT配置
  • 提供配置的动态读取和验证

关键代码分析:

@Component
@ConfigurationProperties(prefix = "socket")
public class SocketConnectionConfig {

    /**
     * 程控源设备配置
     */
    private SourceConfig source = new SourceConfig();

    /**
     * 被检设备配置
     */
    private DeviceConfig device = new DeviceConfig();

    @Data
    public static class SourceConfig {
        private String ip = "127.0.0.1";
        private Integer port = 62000;
    }

    @Data
    public static class DeviceConfig {
        private String ip = "127.0.0.1";
        private Integer port = 61000;
    }

    /**
     * 需要建立程控源通道的requestId集合
     */
    private static final Set<String> SOURCE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
            "yjc_ytxjy"     // 源通讯检测
    ));

    /**
     * 需要建立被检设备通道的requestId集合
     */
    private static final Set<String> DEVICE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
            "yjc_sbtxjy",   // 连接建立
            "FTP_SEND$01"   // ftp文件传送指令
    ));

    /**
     * 检查指定的requestId是否需要建立程控源连接
     */
    public static boolean needsSourceConnection(String requestId) {
        return SOURCE_CONNECTION_REQUEST_IDS.contains(requestId);
    }

    /**
     * 检查指定的requestId是否需要建立被检设备连接
     */
    public static boolean needsDeviceConnection(String requestId) {
        return DEVICE_CONNECTION_REQUEST_IDS.contains(requestId);
    }
}

2.3 SocketManager.java - 智能Socket管理器

功能职责:

  • 提供智能发送API自动管理连接建立
  • 统一管理Socket会话和EventLoopGroup
  • 支持多种发送模式和连接状态检查

关键代码分析:

@Slf4j
@Component
public class SocketManager {

    @Autowired
    private SocketConnectionConfig socketConnectionConfig;

    /**
     * key为userIdxxx_Source、xxx_Devvalue为channel
     */
    private static final Map<String, Channel> socketSessions = new ConcurrentHashMap<>();

    /**
     * key为userIdxxx_Source、xxx_Devvalue为group
     */
    private static final Map<String, NioEventLoopGroup> socketGroup = new ConcurrentHashMap<>();

    /**
     * 智能发送消息到程控源设备
     * 自动从配置文件读取IP和PORT开发者无需关心网络配置
     * 如果连接不存在且requestId需要建立连接会自动建立连接后发送
     */
    public void smartSendToSource(PreDetectionParam param, String msg) {
        String ip = socketConnectionConfig.getSource().getIp();
        Integer port = socketConnectionConfig.getSource().getPort();
        String requestId = extractRequestId(msg);
        String userId = param.getUserPageId() + CnSocketUtil.SOURCE_TAG;
        
        // 检查是否需要建立连接
        if (SocketConnectionConfig.needsSourceConnection(requestId)) {
            // 检查连接是否存在且活跃
            if (!isChannelActive(userId)) {
                log.info("程控源连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
                // 异步建立程控源连接并发送消息
                CompletableFuture.runAsync(() -> {
                    NettyClient.connectToSourceStatic(ip, port, param, msg);
                });
                return;
            }
        }
        
        // 连接已存在或不需要建立连接,直接发送消息
        log.info("直接发送消息到程控源: userId={}, requestId={}", userId, requestId);
        sendMsg(userId, msg);
    }

    /**
     * 智能发送消息到被检设备
     * 自动从配置文件读取IP和PORT开发者无需关心网络配置
     * 如果连接不存在且requestId需要建立连接会自动建立连接后发送
     */
    public void smartSendToDevice(PreDetectionParam param, String msg) {
        String requestId = extractRequestId(msg);
        String userId = param.getUserPageId() + CnSocketUtil.DEV_TAG;
        
        // 检查是否需要建立连接
        if (SocketConnectionConfig.needsDeviceConnection(requestId)) {
            String ip = socketConnectionConfig.getDevice().getIp();
            Integer port = socketConnectionConfig.getDevice().getPort();
            // 检查连接是否存在且活跃
            if (!isChannelActive(userId)) {
                log.info("被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
                // 异步建立被检设备连接并发送消息
                CompletableFuture.runAsync(() -> {
                    NettyClient.connectToDeviceStatic(ip, port, param, msg);
                });
                return;
            }
        }
        
        // 连接已存在或不需要建立连接,直接发送消息
        log.info("直接发送消息到被检设备: userId={}, requestId={}", userId, requestId);
        sendMsg(userId, msg);
    }

    /**
     * 从消息中提取requestId
     * 支持JSON格式的消息解析
     */
    private static String extractRequestId(String msg) {
        try {
            if (StrUtil.isNotBlank(msg)) {
                // 尝试解析JSON格式消息
                JSONObject jsonObject = JSON.parseObject(msg);
                String requestId = jsonObject.getString("requestId");
                if (StrUtil.isNotBlank(requestId)) {
                    return requestId;
                }
                
                // 如果没有requestId字段尝试解析request_id字段
                requestId = jsonObject.getString("request_id");
                if (StrUtil.isNotBlank(requestId)) {
                    return requestId;
                }
            }
        } catch (Exception e) {
            log.warn("解析消息中的requestId失败: msg={}, error={}", msg, e.getMessage());
        }
        
        return "unknown";
    }

    /**
     * 检查指定用户的Channel是否活跃
     */
    private static boolean isChannelActive(String userId) {
        Channel channel = getChannelByUserId(userId);
        return ObjectUtil.isNotNull(channel) && channel.isActive();
    }
}

2.4 使用示例

2.4.1 业务层调用方式

@Service
@RequiredArgsConstructor
public class PreDetectionServiceImpl implements PreDetectionService {
    
    private final SocketManager socketManager;

    @Override
    public void sourceCommunicationCheck(PreDetectionParam param) {
        // 组装检测消息
        SocketMsg<String> msg = new SocketMsg<>();
        msg.setRequestId("yjc_ytxjy");
        msg.setOperateCode("INIT_GATHER");
        msg.setData(JSON.toJSONString(sourceParam));
        
        // 智能发送 - 系统自动判断是否需要建立连接
        socketManager.smartSendToSource(param, JSON.toJSONString(msg));
    }
}

2.4.2 配置文件示例

# application.yml
socket:
  source:
    ip: 192.168.1.124
    port: 62000
  device:
    ip: 192.168.1.124
    port: 61000

3. Netty客户端组件详解

3.1 NettyClient.java - Spring管理的智能客户端

功能职责:

  • 作为Spring组件提供连接服务
  • 支持源设备和被检设备的智能连接
  • 自动处理Handler的实例化和依赖注入

关键代码分析:

@Component
@Slf4j
public class NettyClient {
    
    @Autowired
    private SocketSourceResponseService socketSourceResponseService;
    
    @Autowired
    private SocketDevResponseService socketDevResponseService;
    
    private static NettyClient instance;

    @PostConstruct
    public void init() {
        instance = this;
    }

    /**
     * 连接到程控源设备
     * Spring管理的实例方法支持依赖注入
     */
    public void connectToSource(String ip, Integer port, PreDetectionParam param, String msg) {
        NettySourceClientHandler handler = createSourceHandler(param);
        executeSocketConnection(ip, port, param, msg, handler);
    }

    /**
     * 连接到被检设备
     * Spring管理的实例方法支持依赖注入
     */
    public void connectToDevice(String ip, Integer port, PreDetectionParam param, String msg) {
        NettyDevClientHandler handler = createDeviceHandler(param);
        executeSocketConnection(ip, port, param, msg, handler);
    }

    /**
     * 静态方法入口 - 保持向后兼容
     */
    public static void connectToSourceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
        if (instance != null) {
            instance.connectToSource(ip, port, param, msg);
        } else {
            log.error("NettyClient未初始化无法创建程控源连接");
        }
    }

    /**
     * 静态方法入口 - 保持向后兼容
     */
    public static void connectToDeviceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
        if (instance != null) {
            instance.connectToDevice(ip, port, param, msg);
        } else {
            log.error("NettyClient未初始化无法创建被检设备连接");
        }
    }

    /**
     * 创建源设备处理器
     * 利用Spring注入的Service实例
     */
    private NettySourceClientHandler createSourceHandler(PreDetectionParam param) {
        return new NettySourceClientHandler(param, socketSourceResponseService);
    }

    /**
     * 创建被检设备处理器
     * 利用Spring注入的Service实例
     */
    private NettyDevClientHandler createDeviceHandler(PreDetectionParam param) {
        return new NettyDevClientHandler(param, socketDevResponseService);
    }

    /**
     * 执行Socket连接建立流程重构后的核心实现
     */
    private static void executeSocketConnection(String ip, Integer port,
                                                PreDetectionParam param, String msg, SimpleChannelInboundHandler<String> handler) {
        // 创建NIO事件循环组
        NioEventLoopGroup group = createEventLoopGroup();

        try {
            // 配置客户端启动器
            Bootstrap bootstrap = configureBootstrap(group);
            // 创建管道初始化器
            ChannelInitializer<NioSocketChannel> initializer = createChannelInitializer(param, handler);
            bootstrap.handler(initializer);
            // 同步连接到目标服务器
            ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
            // 处理连接结果
            handleConnectionResult(channelFuture, param, handler, group, msg);
        } catch (Exception e) {
            // 处理连接异常
            handleConnectionException(e, param, handler, group);
        }
    }

    /**
     * 创建NIO事件循环组
     */
    private static NioEventLoopGroup createEventLoopGroup() {
        return new NioEventLoopGroup();
    }

    /**
     * 配置Bootstrap启动器
     */
    private static Bootstrap configureBootstrap(NioEventLoopGroup group) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .channel(NioSocketChannel.class);
        return bootstrap;
    }

    /**
     * 创建通道初始化器
     * 根据处理器类型配置不同的Pipeline
     */
    private static ChannelInitializer<NioSocketChannel> createChannelInitializer(
            PreDetectionParam param, SimpleChannelInboundHandler<String> handler) {
        return new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) {
                if (handler instanceof NettySourceClientHandler) {
                    configureSourcePipeline(ch, param, handler);
                } else {
                    configureDevicePipeline(ch, param, handler);
                }
            }
        };
    }

    /**
     * 配置程控源设备的Pipeline
     */
    private static void configureSourcePipeline(NioSocketChannel ch, PreDetectionParam param,
                                               SimpleChannelInboundHandler<String> handler) {
        ch.pipeline()
                .addLast("frame-decoder", new LineBasedFrameDecoder(10240))
                .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8))
                .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8))
                .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG))
                .addLast("source-handler", handler);
    }

    /**
     * 配置被检设备的Pipeline
     */
    private static void configureDevicePipeline(NioSocketChannel ch, PreDetectionParam param,
                                              SimpleChannelInboundHandler<String> handler) {
        ch.pipeline()
                .addLast("frame-decoder", new LineBasedFrameDecoder(10240))
                .addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8))
                .addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8))
                .addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.DEV_TAG))
                .addLast("idle-detector", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
                .addLast("device-handler", handler);
    }

    /**
     * 处理连接建立结果
     */
    private static void handleConnectionResult(ChannelFuture channelFuture, PreDetectionParam param,
                                             SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group, String msg) {
        if (channelFuture.isSuccess()) {
            log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress());
            
            // 注册会话和EventLoopGroup到管理器
            String userId = getConnectionUserId(param, handler);
            SocketManager.addUser(userId, channelFuture.channel());
            SocketManager.addGroup(userId, group);
            
            // 发送初始消息
            if (StrUtil.isNotBlank(msg)) {
                channelFuture.channel().writeAndFlush(msg + "\n");
                log.info("发送初始消息: {}", msg);
            }
        } else {
            log.error("Socket连接建立失败");
            handleConnectionFailure(param, group);
        }
    }

    /**
     * 获取连接的用户ID
     */
    private static String getConnectionUserId(PreDetectionParam param, SimpleChannelInboundHandler<String> handler) {
        String tag = (handler instanceof NettySourceClientHandler) ? CnSocketUtil.SOURCE_TAG : CnSocketUtil.DEV_TAG;
        return param.getUserPageId() + tag;
    }

    /**
     * 处理连接异常
     */
    private static void handleConnectionException(Exception e, PreDetectionParam param,
                                                SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group) {
        log.error("Socket连接过程中发生异常: {}", e.getMessage(), e);
        
        // 清理资源
        if (group != null) {
            group.shutdownGracefully();
        }
        
        // 发送错误消息到前端
        try {
            CnSocketUtil.quitSendSource(param);
            CnSocketUtil.quitSend(param);
        } catch (Exception ex) {
            log.error("发送错误消息失败", ex);
        }
    }

    /**
     * 处理连接失败情况
     */
    private static void handleConnectionFailure(PreDetectionParam param, NioEventLoopGroup group) {
        // 清理EventLoopGroup资源
        if (group != null) {
            group.shutdownGracefully();
        }
        
        // 通知业务层连接失败
        try {
            CnSocketUtil.quitSendSource(param);
            CnSocketUtil.quitSend(param);
        } catch (Exception e) {
            log.error("处理连接失败通知时发生异常", e);
        }
    }
}

3.2 Handler组件改进

3.2.1 NettySourceClientHandler.java - 源设备处理器

功能改进:

  • 简化异常处理逻辑,移除冗余注释
  • 使用slf4j日志替代System.out.println
  • 优化连接状态管理
@RequiredArgsConstructor
@Slf4j
public class NettySourceClientHandler extends SimpleChannelInboundHandler<String> {
    private final PreDetectionParam webUser;
    private final SocketSourceResponseService sourceResponseService;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("程控源客户端通道已建立: {}", ctx.channel().id());
        SocketManager.addUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG, ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
        log.debug("接收源设备数据: {}", msg);
        try {
            sourceResponseService.deal(webUser, msg);
        } catch (Exception e) {
            log.error("处理源设备响应异常", e);
            CnSocketUtil.quitSend(webUser);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("程控源客户端连接断开");
        ctx.close();
        SocketManager.removeUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
                log.debug("程控源设备空闲状态触发");
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("程控源通信异常", cause);
        if (cause instanceof ConnectException) {
            log.warn("程控源连接异常");
        } else if (cause instanceof IOException) {
            WebServiceManager.sendDetectionErrorMessage(webUser.getUserPageId(), SourceOperateCodeEnum.SERVER_ERROR);
        } else if (cause instanceof TimeoutException) {
            log.warn("程控源通信超时");
        }
        ctx.close();
    }
}

3.2.2 NettyDevClientHandler.java - 被检设备处理器

功能改进:

  • 精简超时检测逻辑,移除大段注释代码
  • 优化异常处理机制
  • 统一日志输出格式
@RequiredArgsConstructor
@Slf4j
public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
    private final PreDetectionParam param;
    private final SocketDevResponseService socketDevResponseService;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("被检设备客户端通道已建立: {}", ctx.channel().id());
        SocketManager.addUser(param.getUserPageId() + CnSocketUtil.DEV_TAG, ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.debug("接收被检设备数据: {}", msg);
        try {
            socketDevResponseService.deal(param, msg);
        } catch (Exception e) {
            log.error("处理被检设备响应异常", e);
            CnSocketUtil.quitSend(param);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("被检设备客户端连接断开");
        ctx.close();
        SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.warn("被检设备读超时触发");
                handleReadTimeout(ctx);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * 处理读超时事件
     */
    private void handleReadTimeout(ChannelHandlerContext ctx) {
        if (!FormalTestManager.hasStopFlag) {
            if (CollUtil.isNotEmpty(SocketManager.getSourceList())) {
                SourceIssue sourceIssue = SocketManager.getSourceList().get(0);
                // 更新超时计时器
                updateTimeoutCounter(sourceIssue);
                // 检查是否需要触发超时处理
                if (shouldTriggerTimeout(sourceIssue)) {
                    log.warn("检测项超时: {}", sourceIssue.getType());
                    CnSocketUtil.quitSend(param);
                    timeoutSend(sourceIssue);
                }
            }
        }
    }

    /**
     * 更新超时计时器
     */
    private void updateTimeoutCounter(SourceIssue sourceIssue) {
        SocketManager.clockMap.put(sourceIssue.getIndex(),
                SocketManager.clockMap.getOrDefault(sourceIssue.getIndex(), 0L) + 60L);
    }

    /**
     * 判断是否应该触发超时处理
     */
    private boolean shouldTriggerTimeout(SourceIssue sourceIssue) {
        Long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex());
        if (currentTime == null) return false;

        // 根据检测类型设置不同的超时时间
        if (sourceIssue.getType().equals(DicDataEnum.F.getCode())) {
            return currentTime > 1300; // 闪变: 20分钟超时
        } else if (sourceIssue.getType().equals(DicDataEnum.VOLTAGE.getCode()) ||
                   sourceIssue.getType().equals(DicDataEnum.HP.getCode())) {
            return currentTime > 180;  // 统计数据: 3分钟超时
        } else {
            return currentTime > 60;   // 实时数据: 1分钟超时
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("被检设备通信异常", cause);
        if (cause instanceof ConnectException) {
            log.warn("被检设备连接异常");
        } else if (cause instanceof IOException) {
            WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR);
        } else if (cause instanceof TimeoutException) {
            log.warn("被检设备通信超时");
        }
        
        // 清理资源并断开连接
        CnSocketUtil.quitSend(param);
        CnSocketUtil.quitSendSource(param);
        ctx.close();
    }
}

4. Netty服务端组件详解

4.1 NettyServer.java - 服务器核心

功能职责:

  • 提供Socket服务端功能用于测试和开发
  • 支持源通信服务和设备通信服务
  • 模拟外部设备的响应行为

关键代码分析:

public class NettyServer {
    public static final int port = 8574;

    private void runSource() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup work = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap().group(boss, work);
            bootstrap.channel(NioServerSocketChannel.class)
                    .handler(new ChannelInitializer<ServerSocketChannel>() {
                        @Override
                        protected void initChannel(ServerSocketChannel ch) {
                            System.out.println("源通讯服务正在启动中......");
                        }
                    })
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline()
                                .addLast(new LineBasedFrameDecoder(10240))
                                .addLast(new StringDecoder(CharsetUtil.UTF_8))
                                .addLast(new StringEncoder(CharsetUtil.UTF_8))
                                .addLast(new DevNettyServerHandler());
                        }
                    });

            ChannelFuture future = bootstrap.bind(port).sync();
            future.addListener(f -> {
                if (future.isSuccess()) {
                    System.out.println("源通讯服务启动成功");
                } else {
                    System.out.println("源通讯服务启动失败");
                }
            });
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

5. WebSocket通信组件详解

5.1 WebSocketService.java - WebSocket服务核心

功能职责:

  • 启动基于Netty的WebSocket服务器
  • 管理服务器生命周期(启动/关闭)
  • 提供高性能的WebSocket通信支持

关键代码分析:

@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketService implements ApplicationRunner {

    @Value("${webSocket.port:7777}")
    int port;

    EventLoopGroup bossGroup;
    EventLoopGroup workerGroup;
    private Channel serverChannel;
    private CompletableFuture<Void> serverFuture;

    @Override
    public void run(ApplicationArguments args) {
        // 使用CompletableFuture异步启动WebSocket服务避免阻塞Spring Boot主线程
        serverFuture = CompletableFuture.runAsync(this::startWebSocketServer)
                .exceptionally(throwable -> {
                    log.error("WebSocket服务启动异常", throwable);
                    return null;
                });
    }

    private void startWebSocketServer() {
        try {
            bossGroup = new NioEventLoopGroup(1);
            workerGroup = new NioEventLoopGroup();
            
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler())
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new WebSocketInitializer());
            
            ChannelFuture future = serverBootstrap.bind(port).sync();
            serverChannel = future.channel();
            
            future.addListener(f -> {
                if (future.isSuccess()) {
                    log.info("webSocket服务启动成功端口{}", port);
                } else {
                    log.error("webSocket服务启动失败端口{}", port);
                }
            });
            
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("WebSocket服务启动过程中被中断", e);
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            log.error("WebSocket服务启动失败", e);
            throw new RuntimeException("WebSocket服务启动失败", e);
        } finally {
            shutdownGracefully();
        }
    }

    @PreDestroy
    public void destroy() throws InterruptedException {
        log.info("正在关闭WebSocket服务...");
        
        if (serverChannel != null) {
            try {
                serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS);
            } catch (Exception e) {
                log.warn("关闭服务器通道时发生异常", e);
            }
        }
        
        if (bossGroup != null) {
            bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
        }
        
        if (serverFuture != null && !serverFuture.isDone()) {
            boolean cancelled = serverFuture.cancel(true);
        }

        log.info("webSocket服务已销毁");
    }
}

6. Socket响应处理器详解

6.1 响应处理器改进

主要改进:

  • 支持SocketManager的依赖注入
  • 移除硬编码的IP/PORT配置
  • 使用智能发送机制简化代码

6.1.1 SocketSourceResponseService.java

@Service
@RequiredArgsConstructor
public class SocketSourceResponseService {

    private final SocketDevResponseService socketDevResponseService;
    private final IPqDevService iPqDevService;
    private final SocketManager socketManager;

    public void deal(PreDetectionParam param, String msg) throws Exception {
        SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
        String[] tem = socketDataMsg.getRequestId().split(CnSocketUtil.STEP_TAG);
        SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(tem[0]);
        
        if (ObjectUtil.isNotNull(enumByCode)) {
            switch (enumByCode) {
                case YJC_YTXJY:
                    if (ObjectUtil.isNotNull(param.getPlanId())) {
                        detectionDev(param, socketDataMsg);
                    } else {
                        handleYtxjySimulate(param, socketDataMsg);
                    }
                    break;
                case YJC_XUJY:
                    phaseSequenceDev(param, socketDataMsg);
                    break;
                case FORMAL_REAL:
                    if (ObjectUtil.isNotNull(param.getPlanId())) {
                        senParamToDev(param, socketDataMsg);
                    } else {
                        handleSimulateTest(param, socketDataMsg);
                    }
                    break;
                case Coefficient_Check:
                    coefficient(param, socketDataMsg);
                    break;
            }
        }
    }

    // 装置检测 - 使用智能发送机制
    private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) {
        SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
        if (ObjectUtil.isNotNull(dictDataEnumByCode)) {
            SocketMsg<String> socketMsg = new SocketMsg<>();
            switch (dictDataEnumByCode) {
                case SUCCESS:
                    WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
                    
                    Map<String, List<PreDetection>> map = new HashMap<>(1);
                    map.put("deviceList", FormalTestManager.devList);
                    String jsonString = JSON.toJSONString(map);
                    socketMsg.setRequestId(SourceOperateCodeEnum.YJC_SBTXJY.getValue());
                    socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue());
                    socketMsg.setData(jsonString);
                    String json = JSON.toJSONString(socketMsg);
                    
                    // 使用智能发送工具类,自动管理设备连接
                    socketManager.smartSendToDevice(param, json);
                    break;
                case UNPROCESSED_BUSINESS:
                    WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
                    break;
                default:
                    CnSocketUtil.quitSendSource(param);
                    WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
                    break;
            }
        }
    }
}

7. Socket管理与工具类详解

7.1 SocketManager.java - 智能Socket管理器

核心功能:

  1. 智能发送机制: 自动判断连接需求,透明管理连接建立
  2. Spring组件管理: 完全交给Spring容器管理支持依赖注入
  3. 会话管理: 统一管理Socket连接会话和EventLoopGroup
  4. 检测任务管理: 管理检测相关的状态信息和配置

关键数据结构:

@Component
@Slf4j
public class SocketManager {

    @Autowired
    private SocketConnectionConfig socketConnectionConfig;

    // Socket会话管理
    private static final Map<String, Channel> socketSessions = new ConcurrentHashMap<>();
    private static final Map<String, NioEventLoopGroup> socketGroup = new ConcurrentHashMap<>();

    // 检测任务管理
    private static Map<String, Long> targetMap = new ConcurrentHashMap<>();
    private static List<SourceIssue> sourceIssueList = new CopyOnWriteArrayList<>();
    public static Map<String, String> valueTypeMap = new HashMap<>();
    public static volatile Map<Integer, Long> clockMap = new ConcurrentHashMap<>();
    public static volatile Map<DataSourceEnum, Long> contrastClockMap = new ConcurrentHashMap<>();

    // 基础连接管理方法
    public static void addUser(String userId, Channel channel) {
        socketSessions.put(userId, channel);
    }

    public static void addGroup(String userId, NioEventLoopGroup group) {
        socketGroup.put(userId, group);
    }

    public static void removeUser(String userId) {
        Channel channel = socketSessions.get(userId);
        if (ObjectUtil.isNotNull(channel)) {
            try {
                channel.close().sync();
            } catch (InterruptedException e) {
                log.error("关闭通道异常", e);
            }
            NioEventLoopGroup eventExecutors = socketGroup.get(userId);
            if (ObjectUtil.isNotNull(eventExecutors)) {
                eventExecutors.shutdownGracefully();
                log.info("{}__{}关闭了客户端", userId, channel.id());
            }
        }
        socketSessions.remove(userId);
    }

    public static void sendMsg(String userId, String msg) {
        Channel channel = socketSessions.get(userId);
        if (ObjectUtil.isNotNull(channel)) {
            channel.writeAndFlush(msg + '\n');
            log.info("{}__{}往{}发送数据:{}", userId, channel.id(), channel.remoteAddress(), msg);
        } else {
            log.warn("{}__发送数据失败通道不存在{}", userId, msg);
        }
    }

    // 检测任务管理方法
    public static void addSourceList(List<SourceIssue> sList) {
        sourceIssueList = sList;
    }

    public static List<SourceIssue> getSourceList() {
        return sourceIssueList;
    }

    public static void delSource(Integer index) {
        sourceIssueList.removeIf(s -> index.equals(s.getIndex()));
    }

    public static void delSourceTarget(String sourceTag) {
        targetMap.remove(sourceTag);
    }

    public static void initMap(Map<String, Long> map) {
        targetMap = map;
    }

    public static void addTargetMap(String scriptType, Long count) {
        targetMap.put(scriptType, count);
    }

    public static Long getSourceTarget(String scriptType) {
        return targetMap.get(scriptType);
    }
}

7.2 CnSocketUtil.java - Socket工具类

功能职责:

  • 提供Socket连接的控制功能
  • 封装WebSocket消息推送
  • 定义通信相关常量

关键代码:

public class CnSocketUtil {

    public final static String DEV_TAG = "_Dev";
    public final static String SOURCE_TAG = "_Source";
    public final static String START_TAG = "_Start";
    public final static String END_TAG = "_End";
    public final static String STEP_TAG = "&&";
    public final static String SPLIT_TAG = "_";

    // 退出检测
    public static void quitSend(PreDetectionParam param) {
        SocketMsg<String> socketMsg = new SocketMsg<>();
        socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue());
        socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue());
        SocketManager.sendMsg(param.getUserPageId() + DEV_TAG, JSON.toJSONString(socketMsg));
        WebServiceManager.removePreDetectionParam();
    }

    // 关闭源连接
    public static void quitSendSource(PreDetectionParam param) {
        SocketMsg<String> socketMsg = new SocketMsg<>();
        socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue());
        socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue());
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("sourceId", param.getSourceId());
        socketMsg.setData(jsonObject.toJSONString());
        SocketManager.sendMsg(param.getUserPageId() + SOURCE_TAG, JSON.toJSONString(socketMsg));
        WebServiceManager.removePreDetectionParam();
    }

    // 推送webSocket数据
    public static void sendToWebSocket(String userId, String requestId, String operatorType, 
                                     Object data, String desc) {
        WebSocketVO<Object> webSocketVO = new WebSocketVO<>();
        webSocketVO.setRequestId(requestId);
        webSocketVO.setOperateCode(operatorType);
        webSocketVO.setData(data);
        webSocketVO.setDesc(desc);
        WebServiceManager.sendMessage(userId, webSocketVO);
    }

    // 比对式-退出检测
    public static void contrastSendquit(String userId) {
        System.out.println("比对式-发送关闭备通讯模块指令");
        SocketMsg<String> socketMsg = new SocketMsg<>();
        socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue());
        socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue());
        SocketManager.sendMsg(userId + DEV_TAG, JSON.toJSONString(socketMsg));
        WebServiceManager.removePreDetectionParam();
    }
}

8. 通信数据对象详解

8.1 数据对象结构

8.1.1 SocketMsg.java - Socket消息对象

public class SocketMsg<T> {
    private String requestId;      // 请求ID用于标识消息类型和流程
    private String operateCode;    // 操作代码,标识具体的操作类型
    private T data;               // 数据载荷,支持泛型
    private String desc;          // 描述信息
    private Long timestamp;       // 时间戳
}

8.1.2 SocketDataMsg.java - Socket数据消息对象

public class SocketDataMsg {
    private String requestId;      // 请求ID
    private String operateCode;    // 操作代码
    private String data;          // 响应数据JSON字符串
    private Integer code;         // 响应状态码
    private String message;       // 响应消息
    private String type;          // 消息类型
}

8.1.3 WebSocketVO.java - WebSocket数据对象

public class WebSocketVO<T> {
    private String requestId;      // 请求ID
    private String operateCode;    // 操作代码
    private T data;               // 数据载荷
    private String desc;          // 描述信息
    private Integer status;       // 状态码
    private Long timestamp;       // 时间戳
    private String userId;        // 用户ID
}

9. 通信流程分析

9.1 智能发送流程

sequenceDiagram
    participant Business as 业务层
    participant SocketManager as SocketManager
    participant Config as SocketConnectionConfig
    participant NettyClient as NettyClient
    participant Device as 外部设备

    Business->>SocketManager: smartSendToSource(param, msg)
    SocketManager->>SocketManager: extractRequestId(msg)
    SocketManager->>Config: needsSourceConnection(requestId)
    
    alt 需要建立连接
        Config-->>SocketManager: true
        SocketManager->>SocketManager: isChannelActive(userId)
        
        alt 连接不存在
            SocketManager->>Config: getSource().getIp/Port()
            Config-->>SocketManager: IP/PORT配置
            SocketManager->>NettyClient: connectToSourceStatic(ip, port, param, msg)
            NettyClient->>Device: 建立连接并发送消息
        else 连接已存在
            SocketManager->>SocketManager: sendMsg(userId, msg)
        end
    else 不需要建立连接
        Config-->>SocketManager: false
        SocketManager->>SocketManager: sendMsg(userId, msg)
    end

9.2 Spring组件生命周期流程

graph TB
    A[Spring容器启动] --> B[SocketConnectionConfig初始化]
    B --> C[@ConfigurationProperties绑定配置]
    C --> D[NettyClient注入依赖]
    D --> E[SocketManager注入配置]
    E --> F[业务层注入SocketManager]
    F --> G[智能发送服务就绪]
    
    G --> H[接收发送请求]
    H --> I[检查连接需求]
    I --> J[自动建立连接]
    J --> K[发送消息]
    
    K --> L[Spring容器关闭]
    L --> M[@PreDestroy清理资源]
    M --> N[关闭所有连接]

9.3 配置管理流程

flowchart TD
    A[application.yml] --> B[Spring Boot配置绑定]
    B --> C[SocketConnectionConfig]
    
    C --> D[Source配置]
    C --> E[Device配置]
    C --> F[RequestId配置]
    
    D --> G[程控源IP/PORT]
    E --> H[被检设备IP/PORT]
    F --> I[连接需求判断]
    
    G --> J[SocketManager智能发送]
    H --> J
    I --> J
    
    J --> K[自动连接管理]
    K --> L[透明化发送]

10. 关键技术特性

10.1 智能发送机制特性

10.1.1 自动连接管理

  • 智能判断: 根据requestId自动判断是否需要建立连接
  • 透明操作: 开发者无需关心连接建立过程
  • 配置驱动: 通过简单配置控制连接行为

10.1.2 连接状态检测

private static boolean isChannelActive(String userId) {
    Channel channel = getChannelByUserId(userId);
    return ObjectUtil.isNotNull(channel) && channel.isActive();
}

10.1.3 异步连接建立

CompletableFuture.runAsync(() -> {
    NettyClient.connectToSourceStatic(ip, port, param, msg);
});

10.2 Spring组件化特性

10.2.1 依赖注入管理

@Component
public class SocketManager {
    @Autowired
    private SocketConnectionConfig socketConnectionConfig;
}

@Service
@RequiredArgsConstructor
public class PreDetectionServiceImpl {
    private final SocketManager socketManager;
}

10.2.2 配置属性绑定

@Component
@ConfigurationProperties(prefix = "socket")
public class SocketConnectionConfig {
    private SourceConfig source = new SourceConfig();
    private DeviceConfig device = new DeviceConfig();
}

10.3 配置统一管理特性

10.3.1 统一配置文件

socket:
  source:
    ip: 192.168.1.124
    port: 62000
  device:
    ip: 192.168.1.124
    port: 61000

10.3.2 动态配置支持

  • 环境隔离: 支持不同环境使用不同配置
  • 热更新: 支持配置的动态刷新
  • 默认值: 提供合理的默认配置值

10.4 异常处理和资源管理特性

10.4.1 优化的异常处理

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    log.error("通信异常", cause);
    if (cause instanceof ConnectException) {
        log.warn("连接异常");
    } else if (cause instanceof IOException) {
        WebServiceManager.sendDetectionErrorMessage(userId, errorCode);
    }
    ctx.close();
}

10.4.2 完善的资源清理

public static void removeUser(String userId) {
    Channel channel = socketSessions.get(userId);
    if (ObjectUtil.isNotNull(channel)) {
        try {
            channel.close().sync();
        } catch (InterruptedException e) {
            log.error("关闭通道异常", e);
        }
        NioEventLoopGroup eventExecutors = socketGroup.get(userId);
        if (ObjectUtil.isNotNull(eventExecutors)) {
            eventExecutors.shutdownGracefully();
        }
    }
    socketSessions.remove(userId);
}

10.5 并发安全特性

10.5.1 线程安全设计

  • ConcurrentHashMap: 用于会话管理
  • CopyOnWriteArrayList: 用于检测项列表
  • volatile关键字: 用于状态标志
  • CompletableFuture: 用于异步处理

10.5.2 日志统一管理

@Slf4j
public class NettyClient {
    log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress());
    log.error("Socket连接过程中发生异常: {}", e.getMessage(), e);
    log.debug("发送初始消息: {}", msg);
}

11. 配置与部署

11.1 应用配置

11.1.1 核心配置文件

# application.yml
webSocket:
  port: 7777                    # WebSocket服务端口

socket:
  source:
    ip: 192.168.1.124          # 程控源设备IP
    port: 62000                # 程控源设备端口
  device:
    ip: 192.168.1.124          # 被检设备IP  
    port: 61000                # 被检设备端口

netty:
  server:
    port: 8574                 # Netty服务端端口测试用

# 日志配置
logging:
  level:
    com.njcn.gather.detection.util.socket: INFO
    com.njcn.gather.detection.handler: INFO
    io.netty: WARN
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"

11.1.2 环境配置示例

# application-dev.yml (开发环境)
socket:
  source:
    ip: 127.0.0.1
    port: 62000
  device:
    ip: 127.0.0.1
    port: 61000

# application-prod.yml (生产环境)  
socket:
  source:
    ip: 192.168.1.124
    port: 62000
  device:
    ip: 192.168.1.124
    port: 61000

11.2 Maven依赖配置

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.3.12.RELEASE</version>
    </dependency>
    
    <!-- Netty依赖 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.76.Final</version>
    </dependency>
    
    <!-- FastJSON依赖 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
    
    <!-- HuTool工具类 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.10</version>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

11.3 Spring Boot集成

11.3.1 自动配置启用

@SpringBootApplication
@EnableConfigurationProperties(SocketConnectionConfig.class)
public class DetectionApplication {
    public static void main(String[] args) {
        SpringApplication.run(DetectionApplication.class, args);
    }
}

11.3.2 组件扫描配置

@ComponentScan(basePackages = {
    "com.njcn.gather.detection.util.socket",
    "com.njcn.gather.detection.handler",
    "com.njcn.gather.detection.service"
})

11.4 性能调优参数

11.4.1 Netty性能参数

// 服务端性能调优
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)              // 连接队列大小
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间
    .childOption(ChannelOption.SO_KEEPALIVE, true)      // 启用TCP keepalive
    .childOption(ChannelOption.TCP_NODELAY, true)       // 禁用Nagle算法
    .childOption(ChannelOption.SO_RCVBUF, 32 * 1024)    // 接收缓冲区大小
    .childOption(ChannelOption.SO_SNDBUF, 32 * 1024);   // 发送缓冲区大小

// 客户端性能调优
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)  // 连接超时
    .option(ChannelOption.SO_KEEPALIVE, true)            // keepalive
    .option(ChannelOption.TCP_NODELAY, true)             // 立即发送
    .channel(NioSocketChannel.class);

11.4.2 线程池配置

# application.yml
spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size: 16
        queue-capacity: 100
        thread-name-prefix: "detection-"

11.5 监控和诊断

11.5.1 健康检查配置

@Component
public class SocketHealthIndicator implements HealthIndicator {
    
    @Autowired
    private SocketManager socketManager;
    
    @Override
    public Health health() {
        // 检查Socket连接状态
        if (hasActiveConnections()) {
            return Health.up()
                .withDetail("activeConnections", getActiveConnectionCount())
                .build();
        } else {
            return Health.down()
                .withDetail("reason", "No active socket connections")
                .build();
        }
    }
}

11.5.2 指标监控

@Component
public class SocketMetrics {
    
    private final MeterRegistry meterRegistry;
    private final Counter connectionCounter;
    private final Timer messageProcessingTimer;
    
    public SocketMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.connectionCounter = Counter.builder("socket.connections.total")
                .description("Total socket connections")
                .register(meterRegistry);
        this.messageProcessingTimer = Timer.builder("socket.message.processing.time")
                .description("Message processing time")
                .register(meterRegistry);
    }
}

总结

CN_Gather Detection模块的全新Netty通信架构通过智能Socket管理机制全面Spring组件化的设计,实现了电能质量设备检测系统的现代化通信解决方案。

核心架构优势

  1. 智能化程度高

    • 自动连接管理,开发者无需关心连接细节
    • 配置驱动的连接策略,灵活可控
    • 透明化的发送机制,简化业务代码
  2. Spring生态集成

    • 完全Spring组件化管理遵循IoC原则
    • 统一的配置管理,支持多环境部署
    • 完善的依赖注入,松耦合设计
  3. 代码质量提升

    • 移除大量冗余和无用代码
    • 统一日志管理,便于调试和监控
    • 优化异常处理,提高系统稳定性
  4. 可维护性增强

    • 模块化设计,职责边界清晰
    • 配置集中管理,降低维护成本
    • 完善的资源管理,避免内存泄漏
  5. 开发体验优化

    • 简化的API设计降低使用门槛
    • 智能化的连接管理,减少样板代码
    • 统一的错误处理,提高开发效率

技术特色

  • 智能发送机制: 业界领先的自动连接管理技术
  • 配置统一管理: 现代化的配置管理模式
  • Spring深度集成: 充分利用Spring生态优势
  • 高并发支持: 基于Netty NIO的高性能通信
  • 完善监控: 全方位的监控和诊断能力

该架构为CN_Gather系统提供了稳定、高效、易维护的通信基础确保了电能质量检测业务的可靠运行同时为未来的功能扩展和性能优化奠定了坚实基础。