- 新增 CLAUDE.md 项目架构和开发指导文档 - 添加 Gitea本地协作开发服务器配置指南 - 完善检测模块架构分析文档 - 增加报告生成和Word文档处理工具指南 - 添加动态表格和结果服务测试用例 - 更新应用配置和VS Code开发环境设置 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
55 KiB
55 KiB
CN_Gather Detection模块 Netty通信架构详细分析文档
目录
- 架构概览
- 智能Socket通信机制
- Netty客户端组件详解
- Netty服务端组件详解
- WebSocket通信组件详解
- Socket响应处理器详解
- Socket管理与工具类详解
- 通信数据对象详解
- 通信流程分析
- 关键技术特性
- 配置与部署
1. 架构概览
CN_Gather Detection模块采用智能Socket + WebSocket的混合通信架构,通过全新的智能发送机制和Spring组件化设计,支持电能质量设备检测的复杂业务场景。
1.1 整体架构图
[前端页面] ←→ WebSocket(7777) ←→ [Detection应用] ←→ 智能Socket管理器 ←→ [源设备(62000)/被检设备(61000)]
↑ ↑
Spring容器管理 自动连接建立
↑ ↑
配置统一管理 智能发送机制
1.2 核心组件层次结构
detection/
├── util/socket/
│ ├── cilent/ # Netty客户端组件
│ │ ├── NettyClient.java # 智能客户端(Spring组件)
│ │ ├── NettySourceClientHandler.java # 源设备处理器
│ │ ├── NettyDevClientHandler.java # 被检设备处理器
│ │ ├── NettyContrastClientHandler.java # 比对设备处理器
│ │ └── HeartbeatHandler.java # 心跳处理器
│ ├── service/ # Netty服务端组件
│ │ ├── NettyServer.java # 服务器核心
│ │ ├── DevNettyServerHandler.java # 设备服务端处理器
│ │ └── SourceNettyServerHandler.java # 源服务端处理器
│ ├── websocket/ # WebSocket通信组件
│ │ ├── WebSocketService.java # WebSocket服务
│ │ ├── WebSocketHandler.java # 消息处理器
│ │ ├── WebSocketInitializer.java # 初始化器
│ │ └── WebServiceManager.java # 会话管理器
│ ├── config/ # 配置管理组件(新增)
│ │ └── SocketConnectionConfig.java # Socket连接配置
│ ├── SocketManager.java # 智能Socket管理器(Spring组件)
│ ├── CnSocketUtil.java # Socket工具类
│ ├── FormalTestManager.java # 检测管理器
│ └── XiNumberManager.java # 系数管理器
└── handler/ # 业务响应处理器
├── SocketSourceResponseService.java # 源响应处理
├── SocketDevResponseService.java # 设备响应处理
└── SocketContrastResponseService.java # 比对响应处理
1.3 核心架构改进
1.3.1 智能发送机制
- 自动连接管理: 根据requestId自动判断是否需要建立连接
- 透明化操作: 开发者只需关心业务逻辑,连接管理完全透明
- 配置驱动: 通过配置文件统一管理需要建立连接的requestId
1.3.2 Spring组件化
- 全面Spring管理: NettyClient和SocketManager完全交给Spring容器管理
- 依赖注入: 通过构造函数注入实现松耦合设计
- 生命周期管理: 利用Spring的@PostConstruct和@PreDestroy管理组件生命周期
1.3.3 配置统一管理
- 集中配置: 所有Socket相关配置统一在application.yml中管理
- 环境隔离: 支持不同环境使用不同的IP和端口配置
- 配置热更新: 支持配置的动态刷新
2. 智能Socket通信机制
2.1 智能发送机制核心设计
设计理念:
- 开发者友好: 开发者只需调用发送方法,无需关心连接管理
- 自动化管理: 系统自动判断是否需要建立连接
- 配置驱动: 通过配置决定哪些requestId需要建立连接
核心组件关系图:
graph TB
A[业务层] --> B[SocketManager]
B --> C[SocketConnectionConfig]
B --> D[NettyClient]
C --> E[application.yml]
D --> F[NettySourceClientHandler]
D --> G[NettyDevClientHandler]
subgraph "Spring容器管理"
B
C
D
end
subgraph "配置管理"
E
H[requestId配置]
I[IP/PORT配置]
end
2.2 SocketConnectionConfig.java - 智能配置管理器
功能职责:
- 管理需要建立连接的requestId配置
- 统一管理Socket的IP和PORT配置
- 提供配置的动态读取和验证
关键代码分析:
@Component
@ConfigurationProperties(prefix = "socket")
public class SocketConnectionConfig {
/**
* 程控源设备配置
*/
private SourceConfig source = new SourceConfig();
/**
* 被检设备配置
*/
private DeviceConfig device = new DeviceConfig();
@Data
public static class SourceConfig {
private String ip = "127.0.0.1";
private Integer port = 62000;
}
@Data
public static class DeviceConfig {
private String ip = "127.0.0.1";
private Integer port = 61000;
}
/**
* 需要建立程控源通道的requestId集合
*/
private static final Set<String> SOURCE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
"yjc_ytxjy" // 源通讯检测
));
/**
* 需要建立被检设备通道的requestId集合
*/
private static final Set<String> DEVICE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
"yjc_sbtxjy", // 连接建立
"FTP_SEND$01" // ftp文件传送指令
));
/**
* 检查指定的requestId是否需要建立程控源连接
*/
public static boolean needsSourceConnection(String requestId) {
return SOURCE_CONNECTION_REQUEST_IDS.contains(requestId);
}
/**
* 检查指定的requestId是否需要建立被检设备连接
*/
public static boolean needsDeviceConnection(String requestId) {
return DEVICE_CONNECTION_REQUEST_IDS.contains(requestId);
}
}
2.3 SocketManager.java - 智能Socket管理器
功能职责:
- 提供智能发送API,自动管理连接建立
- 统一管理Socket会话和EventLoopGroup
- 支持多种发送模式和连接状态检查
关键代码分析:
@Slf4j
@Component
public class SocketManager {
@Autowired
private SocketConnectionConfig socketConnectionConfig;
/**
* key为userId(xxx_Source、xxx_Dev),value为channel
*/
private static final Map<String, Channel> socketSessions = new ConcurrentHashMap<>();
/**
* key为userId(xxx_Source、xxx_Dev),value为group
*/
private static final Map<String, NioEventLoopGroup> socketGroup = new ConcurrentHashMap<>();
/**
* 智能发送消息到程控源设备
* 自动从配置文件读取IP和PORT,开发者无需关心网络配置
* 如果连接不存在且requestId需要建立连接,会自动建立连接后发送
*/
public void smartSendToSource(PreDetectionParam param, String msg) {
String ip = socketConnectionConfig.getSource().getIp();
Integer port = socketConnectionConfig.getSource().getPort();
String requestId = extractRequestId(msg);
String userId = param.getUserPageId() + CnSocketUtil.SOURCE_TAG;
// 检查是否需要建立连接
if (SocketConnectionConfig.needsSourceConnection(requestId)) {
// 检查连接是否存在且活跃
if (!isChannelActive(userId)) {
log.info("程控源连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
// 异步建立程控源连接并发送消息
CompletableFuture.runAsync(() -> {
NettyClient.connectToSourceStatic(ip, port, param, msg);
});
return;
}
}
// 连接已存在或不需要建立连接,直接发送消息
log.info("直接发送消息到程控源: userId={}, requestId={}", userId, requestId);
sendMsg(userId, msg);
}
/**
* 智能发送消息到被检设备
* 自动从配置文件读取IP和PORT,开发者无需关心网络配置
* 如果连接不存在且requestId需要建立连接,会自动建立连接后发送
*/
public void smartSendToDevice(PreDetectionParam param, String msg) {
String requestId = extractRequestId(msg);
String userId = param.getUserPageId() + CnSocketUtil.DEV_TAG;
// 检查是否需要建立连接
if (SocketConnectionConfig.needsDeviceConnection(requestId)) {
String ip = socketConnectionConfig.getDevice().getIp();
Integer port = socketConnectionConfig.getDevice().getPort();
// 检查连接是否存在且活跃
if (!isChannelActive(userId)) {
log.info("被检设备连接不存在,自动建立连接: userId={}, requestId={}", userId, requestId);
// 异步建立被检设备连接并发送消息
CompletableFuture.runAsync(() -> {
NettyClient.connectToDeviceStatic(ip, port, param, msg);
});
return;
}
}
// 连接已存在或不需要建立连接,直接发送消息
log.info("直接发送消息到被检设备: userId={}, requestId={}", userId, requestId);
sendMsg(userId, msg);
}
/**
* 从消息中提取requestId
* 支持JSON格式的消息解析
*/
private static String extractRequestId(String msg) {
try {
if (StrUtil.isNotBlank(msg)) {
// 尝试解析JSON格式消息
JSONObject jsonObject = JSON.parseObject(msg);
String requestId = jsonObject.getString("requestId");
if (StrUtil.isNotBlank(requestId)) {
return requestId;
}
// 如果没有requestId字段,尝试解析request_id字段
requestId = jsonObject.getString("request_id");
if (StrUtil.isNotBlank(requestId)) {
return requestId;
}
}
} catch (Exception e) {
log.warn("解析消息中的requestId失败: msg={}, error={}", msg, e.getMessage());
}
return "unknown";
}
/**
* 检查指定用户的Channel是否活跃
*/
private static boolean isChannelActive(String userId) {
Channel channel = getChannelByUserId(userId);
return ObjectUtil.isNotNull(channel) && channel.isActive();
}
}
2.4 使用示例
2.4.1 业务层调用方式
@Service
@RequiredArgsConstructor
public class PreDetectionServiceImpl implements PreDetectionService {
private final SocketManager socketManager;
@Override
public void sourceCommunicationCheck(PreDetectionParam param) {
// 组装检测消息
SocketMsg<String> msg = new SocketMsg<>();
msg.setRequestId("yjc_ytxjy");
msg.setOperateCode("INIT_GATHER");
msg.setData(JSON.toJSONString(sourceParam));
// 智能发送 - 系统自动判断是否需要建立连接
socketManager.smartSendToSource(param, JSON.toJSONString(msg));
}
}
2.4.2 配置文件示例
# application.yml
socket:
source:
ip: 192.168.1.124
port: 62000
device:
ip: 192.168.1.124
port: 61000
3. Netty客户端组件详解
3.1 NettyClient.java - Spring管理的智能客户端
功能职责:
- 作为Spring组件提供连接服务
- 支持源设备和被检设备的智能连接
- 自动处理Handler的实例化和依赖注入
关键代码分析:
@Component
@Slf4j
public class NettyClient {
@Autowired
private SocketSourceResponseService socketSourceResponseService;
@Autowired
private SocketDevResponseService socketDevResponseService;
private static NettyClient instance;
@PostConstruct
public void init() {
instance = this;
}
/**
* 连接到程控源设备
* Spring管理的实例方法,支持依赖注入
*/
public void connectToSource(String ip, Integer port, PreDetectionParam param, String msg) {
NettySourceClientHandler handler = createSourceHandler(param);
executeSocketConnection(ip, port, param, msg, handler);
}
/**
* 连接到被检设备
* Spring管理的实例方法,支持依赖注入
*/
public void connectToDevice(String ip, Integer port, PreDetectionParam param, String msg) {
NettyDevClientHandler handler = createDeviceHandler(param);
executeSocketConnection(ip, port, param, msg, handler);
}
/**
* 静态方法入口 - 保持向后兼容
*/
public static void connectToSourceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
if (instance != null) {
instance.connectToSource(ip, port, param, msg);
} else {
log.error("NettyClient未初始化,无法创建程控源连接");
}
}
/**
* 静态方法入口 - 保持向后兼容
*/
public static void connectToDeviceStatic(String ip, Integer port, PreDetectionParam param, String msg) {
if (instance != null) {
instance.connectToDevice(ip, port, param, msg);
} else {
log.error("NettyClient未初始化,无法创建被检设备连接");
}
}
/**
* 创建源设备处理器
* 利用Spring注入的Service实例
*/
private NettySourceClientHandler createSourceHandler(PreDetectionParam param) {
return new NettySourceClientHandler(param, socketSourceResponseService);
}
/**
* 创建被检设备处理器
* 利用Spring注入的Service实例
*/
private NettyDevClientHandler createDeviceHandler(PreDetectionParam param) {
return new NettyDevClientHandler(param, socketDevResponseService);
}
/**
* 执行Socket连接建立流程(重构后的核心实现)
*/
private static void executeSocketConnection(String ip, Integer port,
PreDetectionParam param, String msg, SimpleChannelInboundHandler<String> handler) {
// 创建NIO事件循环组
NioEventLoopGroup group = createEventLoopGroup();
try {
// 配置客户端启动器
Bootstrap bootstrap = configureBootstrap(group);
// 创建管道初始化器
ChannelInitializer<NioSocketChannel> initializer = createChannelInitializer(param, handler);
bootstrap.handler(initializer);
// 同步连接到目标服务器
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
// 处理连接结果
handleConnectionResult(channelFuture, param, handler, group, msg);
} catch (Exception e) {
// 处理连接异常
handleConnectionException(e, param, handler, group);
}
}
/**
* 创建NIO事件循环组
*/
private static NioEventLoopGroup createEventLoopGroup() {
return new NioEventLoopGroup();
}
/**
* 配置Bootstrap启动器
*/
private static Bootstrap configureBootstrap(NioEventLoopGroup group) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class);
return bootstrap;
}
/**
* 创建通道初始化器
* 根据处理器类型配置不同的Pipeline
*/
private static ChannelInitializer<NioSocketChannel> createChannelInitializer(
PreDetectionParam param, SimpleChannelInboundHandler<String> handler) {
return new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
if (handler instanceof NettySourceClientHandler) {
configureSourcePipeline(ch, param, handler);
} else {
configureDevicePipeline(ch, param, handler);
}
}
};
}
/**
* 配置程控源设备的Pipeline
*/
private static void configureSourcePipeline(NioSocketChannel ch, PreDetectionParam param,
SimpleChannelInboundHandler<String> handler) {
ch.pipeline()
.addLast("frame-decoder", new LineBasedFrameDecoder(10240))
.addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8))
.addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8))
.addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.SOURCE_TAG))
.addLast("source-handler", handler);
}
/**
* 配置被检设备的Pipeline
*/
private static void configureDevicePipeline(NioSocketChannel ch, PreDetectionParam param,
SimpleChannelInboundHandler<String> handler) {
ch.pipeline()
.addLast("frame-decoder", new LineBasedFrameDecoder(10240))
.addLast("string-decoder", new StringDecoder(CharsetUtil.UTF_8))
.addLast("string-encoder", new StringEncoder(CharsetUtil.UTF_8))
.addLast("heartbeat", new HeartbeatHandler(param, CnSocketUtil.DEV_TAG))
.addLast("idle-detector", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
.addLast("device-handler", handler);
}
/**
* 处理连接建立结果
*/
private static void handleConnectionResult(ChannelFuture channelFuture, PreDetectionParam param,
SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group, String msg) {
if (channelFuture.isSuccess()) {
log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress());
// 注册会话和EventLoopGroup到管理器
String userId = getConnectionUserId(param, handler);
SocketManager.addUser(userId, channelFuture.channel());
SocketManager.addGroup(userId, group);
// 发送初始消息
if (StrUtil.isNotBlank(msg)) {
channelFuture.channel().writeAndFlush(msg + "\n");
log.info("发送初始消息: {}", msg);
}
} else {
log.error("Socket连接建立失败");
handleConnectionFailure(param, group);
}
}
/**
* 获取连接的用户ID
*/
private static String getConnectionUserId(PreDetectionParam param, SimpleChannelInboundHandler<String> handler) {
String tag = (handler instanceof NettySourceClientHandler) ? CnSocketUtil.SOURCE_TAG : CnSocketUtil.DEV_TAG;
return param.getUserPageId() + tag;
}
/**
* 处理连接异常
*/
private static void handleConnectionException(Exception e, PreDetectionParam param,
SimpleChannelInboundHandler<String> handler, NioEventLoopGroup group) {
log.error("Socket连接过程中发生异常: {}", e.getMessage(), e);
// 清理资源
if (group != null) {
group.shutdownGracefully();
}
// 发送错误消息到前端
try {
CnSocketUtil.quitSendSource(param);
CnSocketUtil.quitSend(param);
} catch (Exception ex) {
log.error("发送错误消息失败", ex);
}
}
/**
* 处理连接失败情况
*/
private static void handleConnectionFailure(PreDetectionParam param, NioEventLoopGroup group) {
// 清理EventLoopGroup资源
if (group != null) {
group.shutdownGracefully();
}
// 通知业务层连接失败
try {
CnSocketUtil.quitSendSource(param);
CnSocketUtil.quitSend(param);
} catch (Exception e) {
log.error("处理连接失败通知时发生异常", e);
}
}
}
3.2 Handler组件改进
3.2.1 NettySourceClientHandler.java - 源设备处理器
功能改进:
- 简化异常处理逻辑,移除冗余注释
- 使用slf4j日志替代System.out.println
- 优化连接状态管理
@RequiredArgsConstructor
@Slf4j
public class NettySourceClientHandler extends SimpleChannelInboundHandler<String> {
private final PreDetectionParam webUser;
private final SocketSourceResponseService sourceResponseService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("程控源客户端通道已建立: {}", ctx.channel().id());
SocketManager.addUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG, ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
log.debug("接收源设备数据: {}", msg);
try {
sourceResponseService.deal(webUser, msg);
} catch (Exception e) {
log.error("处理源设备响应异常", e);
CnSocketUtil.quitSend(webUser);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("程控源客户端连接断开");
ctx.close();
SocketManager.removeUser(webUser.getUserPageId() + CnSocketUtil.SOURCE_TAG);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
log.debug("程控源设备空闲状态触发");
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("程控源通信异常", cause);
if (cause instanceof ConnectException) {
log.warn("程控源连接异常");
} else if (cause instanceof IOException) {
WebServiceManager.sendDetectionErrorMessage(webUser.getUserPageId(), SourceOperateCodeEnum.SERVER_ERROR);
} else if (cause instanceof TimeoutException) {
log.warn("程控源通信超时");
}
ctx.close();
}
}
3.2.2 NettyDevClientHandler.java - 被检设备处理器
功能改进:
- 精简超时检测逻辑,移除大段注释代码
- 优化异常处理机制
- 统一日志输出格式
@RequiredArgsConstructor
@Slf4j
public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
private final PreDetectionParam param;
private final SocketDevResponseService socketDevResponseService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("被检设备客户端通道已建立: {}", ctx.channel().id());
SocketManager.addUser(param.getUserPageId() + CnSocketUtil.DEV_TAG, ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.debug("接收被检设备数据: {}", msg);
try {
socketDevResponseService.deal(param, msg);
} catch (Exception e) {
log.error("处理被检设备响应异常", e);
CnSocketUtil.quitSend(param);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("被检设备客户端连接断开");
ctx.close();
SocketManager.removeUser(param.getUserPageId() + CnSocketUtil.DEV_TAG);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.warn("被检设备读超时触发");
handleReadTimeout(ctx);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
/**
* 处理读超时事件
*/
private void handleReadTimeout(ChannelHandlerContext ctx) {
if (!FormalTestManager.hasStopFlag) {
if (CollUtil.isNotEmpty(SocketManager.getSourceList())) {
SourceIssue sourceIssue = SocketManager.getSourceList().get(0);
// 更新超时计时器
updateTimeoutCounter(sourceIssue);
// 检查是否需要触发超时处理
if (shouldTriggerTimeout(sourceIssue)) {
log.warn("检测项超时: {}", sourceIssue.getType());
CnSocketUtil.quitSend(param);
timeoutSend(sourceIssue);
}
}
}
}
/**
* 更新超时计时器
*/
private void updateTimeoutCounter(SourceIssue sourceIssue) {
SocketManager.clockMap.put(sourceIssue.getIndex(),
SocketManager.clockMap.getOrDefault(sourceIssue.getIndex(), 0L) + 60L);
}
/**
* 判断是否应该触发超时处理
*/
private boolean shouldTriggerTimeout(SourceIssue sourceIssue) {
Long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex());
if (currentTime == null) return false;
// 根据检测类型设置不同的超时时间
if (sourceIssue.getType().equals(DicDataEnum.F.getCode())) {
return currentTime > 1300; // 闪变: 20分钟超时
} else if (sourceIssue.getType().equals(DicDataEnum.VOLTAGE.getCode()) ||
sourceIssue.getType().equals(DicDataEnum.HP.getCode())) {
return currentTime > 180; // 统计数据: 3分钟超时
} else {
return currentTime > 60; // 实时数据: 1分钟超时
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("被检设备通信异常", cause);
if (cause instanceof ConnectException) {
log.warn("被检设备连接异常");
} else if (cause instanceof IOException) {
WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.DEVICE_ERROR);
} else if (cause instanceof TimeoutException) {
log.warn("被检设备通信超时");
}
// 清理资源并断开连接
CnSocketUtil.quitSend(param);
CnSocketUtil.quitSendSource(param);
ctx.close();
}
}
4. Netty服务端组件详解
4.1 NettyServer.java - 服务器核心
功能职责:
- 提供Socket服务端功能,用于测试和开发
- 支持源通信服务和设备通信服务
- 模拟外部设备的响应行为
关键代码分析:
public class NettyServer {
public static final int port = 8574;
private void runSource() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap().group(boss, work);
bootstrap.channel(NioServerSocketChannel.class)
.handler(new ChannelInitializer<ServerSocketChannel>() {
@Override
protected void initChannel(ServerSocketChannel ch) {
System.out.println("源通讯服务正在启动中......");
}
})
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
.addLast(new LineBasedFrameDecoder(10240))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new DevNettyServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.addListener(f -> {
if (future.isSuccess()) {
System.out.println("源通讯服务启动成功");
} else {
System.out.println("源通讯服务启动失败");
}
});
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
5. WebSocket通信组件详解
5.1 WebSocketService.java - WebSocket服务核心
功能职责:
- 启动基于Netty的WebSocket服务器
- 管理服务器生命周期(启动/关闭)
- 提供高性能的WebSocket通信支持
关键代码分析:
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketService implements ApplicationRunner {
@Value("${webSocket.port:7777}")
int port;
EventLoopGroup bossGroup;
EventLoopGroup workerGroup;
private Channel serverChannel;
private CompletableFuture<Void> serverFuture;
@Override
public void run(ApplicationArguments args) {
// 使用CompletableFuture异步启动WebSocket服务,避免阻塞Spring Boot主线程
serverFuture = CompletableFuture.runAsync(this::startWebSocketServer)
.exceptionally(throwable -> {
log.error("WebSocket服务启动异常", throwable);
return null;
});
}
private void startWebSocketServer() {
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitializer());
ChannelFuture future = serverBootstrap.bind(port).sync();
serverChannel = future.channel();
future.addListener(f -> {
if (future.isSuccess()) {
log.info("webSocket服务启动成功,端口:{}", port);
} else {
log.error("webSocket服务启动失败,端口:{}", port);
}
});
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("WebSocket服务启动过程中被中断", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("WebSocket服务启动失败", e);
throw new RuntimeException("WebSocket服务启动失败", e);
} finally {
shutdownGracefully();
}
}
@PreDestroy
public void destroy() throws InterruptedException {
log.info("正在关闭WebSocket服务...");
if (serverChannel != null) {
try {
serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("关闭服务器通道时发生异常", e);
}
}
if (bossGroup != null) {
bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
}
if (serverFuture != null && !serverFuture.isDone()) {
boolean cancelled = serverFuture.cancel(true);
}
log.info("webSocket服务已销毁");
}
}
6. Socket响应处理器详解
6.1 响应处理器改进
主要改进:
- 支持SocketManager的依赖注入
- 移除硬编码的IP/PORT配置
- 使用智能发送机制简化代码
6.1.1 SocketSourceResponseService.java
@Service
@RequiredArgsConstructor
public class SocketSourceResponseService {
private final SocketDevResponseService socketDevResponseService;
private final IPqDevService iPqDevService;
private final SocketManager socketManager;
public void deal(PreDetectionParam param, String msg) throws Exception {
SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
String[] tem = socketDataMsg.getRequestId().split(CnSocketUtil.STEP_TAG);
SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(tem[0]);
if (ObjectUtil.isNotNull(enumByCode)) {
switch (enumByCode) {
case YJC_YTXJY:
if (ObjectUtil.isNotNull(param.getPlanId())) {
detectionDev(param, socketDataMsg);
} else {
handleYtxjySimulate(param, socketDataMsg);
}
break;
case YJC_XUJY:
phaseSequenceDev(param, socketDataMsg);
break;
case FORMAL_REAL:
if (ObjectUtil.isNotNull(param.getPlanId())) {
senParamToDev(param, socketDataMsg);
} else {
handleSimulateTest(param, socketDataMsg);
}
break;
case Coefficient_Check:
coefficient(param, socketDataMsg);
break;
}
}
}
// 装置检测 - 使用智能发送机制
private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) {
SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
if (ObjectUtil.isNotNull(dictDataEnumByCode)) {
SocketMsg<String> socketMsg = new SocketMsg<>();
switch (dictDataEnumByCode) {
case SUCCESS:
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
Map<String, List<PreDetection>> map = new HashMap<>(1);
map.put("deviceList", FormalTestManager.devList);
String jsonString = JSON.toJSONString(map);
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_SBTXJY.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue());
socketMsg.setData(jsonString);
String json = JSON.toJSONString(socketMsg);
// 使用智能发送工具类,自动管理设备连接
socketManager.smartSendToDevice(param, json);
break;
case UNPROCESSED_BUSINESS:
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
break;
default:
CnSocketUtil.quitSendSource(param);
WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
break;
}
}
}
}
7. Socket管理与工具类详解
7.1 SocketManager.java - 智能Socket管理器
核心功能:
- 智能发送机制: 自动判断连接需求,透明管理连接建立
- Spring组件管理: 完全交给Spring容器管理,支持依赖注入
- 会话管理: 统一管理Socket连接会话和EventLoopGroup
- 检测任务管理: 管理检测相关的状态信息和配置
关键数据结构:
@Component
@Slf4j
public class SocketManager {
@Autowired
private SocketConnectionConfig socketConnectionConfig;
// Socket会话管理
private static final Map<String, Channel> socketSessions = new ConcurrentHashMap<>();
private static final Map<String, NioEventLoopGroup> socketGroup = new ConcurrentHashMap<>();
// 检测任务管理
private static Map<String, Long> targetMap = new ConcurrentHashMap<>();
private static List<SourceIssue> sourceIssueList = new CopyOnWriteArrayList<>();
public static Map<String, String> valueTypeMap = new HashMap<>();
public static volatile Map<Integer, Long> clockMap = new ConcurrentHashMap<>();
public static volatile Map<DataSourceEnum, Long> contrastClockMap = new ConcurrentHashMap<>();
// 基础连接管理方法
public static void addUser(String userId, Channel channel) {
socketSessions.put(userId, channel);
}
public static void addGroup(String userId, NioEventLoopGroup group) {
socketGroup.put(userId, group);
}
public static void removeUser(String userId) {
Channel channel = socketSessions.get(userId);
if (ObjectUtil.isNotNull(channel)) {
try {
channel.close().sync();
} catch (InterruptedException e) {
log.error("关闭通道异常", e);
}
NioEventLoopGroup eventExecutors = socketGroup.get(userId);
if (ObjectUtil.isNotNull(eventExecutors)) {
eventExecutors.shutdownGracefully();
log.info("{}__{}关闭了客户端", userId, channel.id());
}
}
socketSessions.remove(userId);
}
public static void sendMsg(String userId, String msg) {
Channel channel = socketSessions.get(userId);
if (ObjectUtil.isNotNull(channel)) {
channel.writeAndFlush(msg + '\n');
log.info("{}__{}往{}发送数据:{}", userId, channel.id(), channel.remoteAddress(), msg);
} else {
log.warn("{}__发送数据:失败通道不存在{}", userId, msg);
}
}
// 检测任务管理方法
public static void addSourceList(List<SourceIssue> sList) {
sourceIssueList = sList;
}
public static List<SourceIssue> getSourceList() {
return sourceIssueList;
}
public static void delSource(Integer index) {
sourceIssueList.removeIf(s -> index.equals(s.getIndex()));
}
public static void delSourceTarget(String sourceTag) {
targetMap.remove(sourceTag);
}
public static void initMap(Map<String, Long> map) {
targetMap = map;
}
public static void addTargetMap(String scriptType, Long count) {
targetMap.put(scriptType, count);
}
public static Long getSourceTarget(String scriptType) {
return targetMap.get(scriptType);
}
}
7.2 CnSocketUtil.java - Socket工具类
功能职责:
- 提供Socket连接的控制功能
- 封装WebSocket消息推送
- 定义通信相关常量
关键代码:
public class CnSocketUtil {
public final static String DEV_TAG = "_Dev";
public final static String SOURCE_TAG = "_Source";
public final static String START_TAG = "_Start";
public final static String END_TAG = "_End";
public final static String STEP_TAG = "&&";
public final static String SPLIT_TAG = "_";
// 退出检测
public static void quitSend(PreDetectionParam param) {
SocketMsg<String> socketMsg = new SocketMsg<>();
socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue());
SocketManager.sendMsg(param.getUserPageId() + DEV_TAG, JSON.toJSONString(socketMsg));
WebServiceManager.removePreDetectionParam();
}
// 关闭源连接
public static void quitSendSource(PreDetectionParam param) {
SocketMsg<String> socketMsg = new SocketMsg<>();
socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue());
JSONObject jsonObject = new JSONObject();
jsonObject.put("sourceId", param.getSourceId());
socketMsg.setData(jsonObject.toJSONString());
SocketManager.sendMsg(param.getUserPageId() + SOURCE_TAG, JSON.toJSONString(socketMsg));
WebServiceManager.removePreDetectionParam();
}
// 推送webSocket数据
public static void sendToWebSocket(String userId, String requestId, String operatorType,
Object data, String desc) {
WebSocketVO<Object> webSocketVO = new WebSocketVO<>();
webSocketVO.setRequestId(requestId);
webSocketVO.setOperateCode(operatorType);
webSocketVO.setData(data);
webSocketVO.setDesc(desc);
WebServiceManager.sendMessage(userId, webSocketVO);
}
// 比对式-退出检测
public static void contrastSendquit(String userId) {
System.out.println("比对式-发送关闭备通讯模块指令");
SocketMsg<String> socketMsg = new SocketMsg<>();
socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue());
SocketManager.sendMsg(userId + DEV_TAG, JSON.toJSONString(socketMsg));
WebServiceManager.removePreDetectionParam();
}
}
8. 通信数据对象详解
8.1 数据对象结构
8.1.1 SocketMsg.java - Socket消息对象
public class SocketMsg<T> {
private String requestId; // 请求ID,用于标识消息类型和流程
private String operateCode; // 操作代码,标识具体的操作类型
private T data; // 数据载荷,支持泛型
private String desc; // 描述信息
private Long timestamp; // 时间戳
}
8.1.2 SocketDataMsg.java - Socket数据消息对象
public class SocketDataMsg {
private String requestId; // 请求ID
private String operateCode; // 操作代码
private String data; // 响应数据(JSON字符串)
private Integer code; // 响应状态码
private String message; // 响应消息
private String type; // 消息类型
}
8.1.3 WebSocketVO.java - WebSocket数据对象
public class WebSocketVO<T> {
private String requestId; // 请求ID
private String operateCode; // 操作代码
private T data; // 数据载荷
private String desc; // 描述信息
private Integer status; // 状态码
private Long timestamp; // 时间戳
private String userId; // 用户ID
}
9. 通信流程分析
9.1 智能发送流程
sequenceDiagram
participant Business as 业务层
participant SocketManager as SocketManager
participant Config as SocketConnectionConfig
participant NettyClient as NettyClient
participant Device as 外部设备
Business->>SocketManager: smartSendToSource(param, msg)
SocketManager->>SocketManager: extractRequestId(msg)
SocketManager->>Config: needsSourceConnection(requestId)
alt 需要建立连接
Config-->>SocketManager: true
SocketManager->>SocketManager: isChannelActive(userId)
alt 连接不存在
SocketManager->>Config: getSource().getIp/Port()
Config-->>SocketManager: IP/PORT配置
SocketManager->>NettyClient: connectToSourceStatic(ip, port, param, msg)
NettyClient->>Device: 建立连接并发送消息
else 连接已存在
SocketManager->>SocketManager: sendMsg(userId, msg)
end
else 不需要建立连接
Config-->>SocketManager: false
SocketManager->>SocketManager: sendMsg(userId, msg)
end
9.2 Spring组件生命周期流程
graph TB
A[Spring容器启动] --> B[SocketConnectionConfig初始化]
B --> C[@ConfigurationProperties绑定配置]
C --> D[NettyClient注入依赖]
D --> E[SocketManager注入配置]
E --> F[业务层注入SocketManager]
F --> G[智能发送服务就绪]
G --> H[接收发送请求]
H --> I[检查连接需求]
I --> J[自动建立连接]
J --> K[发送消息]
K --> L[Spring容器关闭]
L --> M[@PreDestroy清理资源]
M --> N[关闭所有连接]
9.3 配置管理流程
flowchart TD
A[application.yml] --> B[Spring Boot配置绑定]
B --> C[SocketConnectionConfig]
C --> D[Source配置]
C --> E[Device配置]
C --> F[RequestId配置]
D --> G[程控源IP/PORT]
E --> H[被检设备IP/PORT]
F --> I[连接需求判断]
G --> J[SocketManager智能发送]
H --> J
I --> J
J --> K[自动连接管理]
K --> L[透明化发送]
10. 关键技术特性
10.1 智能发送机制特性
10.1.1 自动连接管理
- 智能判断: 根据requestId自动判断是否需要建立连接
- 透明操作: 开发者无需关心连接建立过程
- 配置驱动: 通过简单配置控制连接行为
10.1.2 连接状态检测
private static boolean isChannelActive(String userId) {
Channel channel = getChannelByUserId(userId);
return ObjectUtil.isNotNull(channel) && channel.isActive();
}
10.1.3 异步连接建立
CompletableFuture.runAsync(() -> {
NettyClient.connectToSourceStatic(ip, port, param, msg);
});
10.2 Spring组件化特性
10.2.1 依赖注入管理
@Component
public class SocketManager {
@Autowired
private SocketConnectionConfig socketConnectionConfig;
}
@Service
@RequiredArgsConstructor
public class PreDetectionServiceImpl {
private final SocketManager socketManager;
}
10.2.2 配置属性绑定
@Component
@ConfigurationProperties(prefix = "socket")
public class SocketConnectionConfig {
private SourceConfig source = new SourceConfig();
private DeviceConfig device = new DeviceConfig();
}
10.3 配置统一管理特性
10.3.1 统一配置文件
socket:
source:
ip: 192.168.1.124
port: 62000
device:
ip: 192.168.1.124
port: 61000
10.3.2 动态配置支持
- 环境隔离: 支持不同环境使用不同配置
- 热更新: 支持配置的动态刷新
- 默认值: 提供合理的默认配置值
10.4 异常处理和资源管理特性
10.4.1 优化的异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("通信异常", cause);
if (cause instanceof ConnectException) {
log.warn("连接异常");
} else if (cause instanceof IOException) {
WebServiceManager.sendDetectionErrorMessage(userId, errorCode);
}
ctx.close();
}
10.4.2 完善的资源清理
public static void removeUser(String userId) {
Channel channel = socketSessions.get(userId);
if (ObjectUtil.isNotNull(channel)) {
try {
channel.close().sync();
} catch (InterruptedException e) {
log.error("关闭通道异常", e);
}
NioEventLoopGroup eventExecutors = socketGroup.get(userId);
if (ObjectUtil.isNotNull(eventExecutors)) {
eventExecutors.shutdownGracefully();
}
}
socketSessions.remove(userId);
}
10.5 并发安全特性
10.5.1 线程安全设计
- ConcurrentHashMap: 用于会话管理
- CopyOnWriteArrayList: 用于检测项列表
- volatile关键字: 用于状态标志
- CompletableFuture: 用于异步处理
10.5.2 日志统一管理
@Slf4j
public class NettyClient {
log.info("Socket连接建立成功: {}", channelFuture.channel().remoteAddress());
log.error("Socket连接过程中发生异常: {}", e.getMessage(), e);
log.debug("发送初始消息: {}", msg);
}
11. 配置与部署
11.1 应用配置
11.1.1 核心配置文件
# application.yml
webSocket:
port: 7777 # WebSocket服务端口
socket:
source:
ip: 192.168.1.124 # 程控源设备IP
port: 62000 # 程控源设备端口
device:
ip: 192.168.1.124 # 被检设备IP
port: 61000 # 被检设备端口
netty:
server:
port: 8574 # Netty服务端端口(测试用)
# 日志配置
logging:
level:
com.njcn.gather.detection.util.socket: INFO
com.njcn.gather.detection.handler: INFO
io.netty: WARN
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
11.1.2 环境配置示例
# application-dev.yml (开发环境)
socket:
source:
ip: 127.0.0.1
port: 62000
device:
ip: 127.0.0.1
port: 61000
# application-prod.yml (生产环境)
socket:
source:
ip: 192.168.1.124
port: 62000
device:
ip: 192.168.1.124
port: 61000
11.2 Maven依赖配置
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<!-- Netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>
<!-- FastJSON依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- HuTool工具类 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
11.3 Spring Boot集成
11.3.1 自动配置启用
@SpringBootApplication
@EnableConfigurationProperties(SocketConnectionConfig.class)
public class DetectionApplication {
public static void main(String[] args) {
SpringApplication.run(DetectionApplication.class, args);
}
}
11.3.2 组件扫描配置
@ComponentScan(basePackages = {
"com.njcn.gather.detection.util.socket",
"com.njcn.gather.detection.handler",
"com.njcn.gather.detection.service"
})
11.4 性能调优参数
11.4.1 Netty性能参数
// 服务端性能调优
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间
.childOption(ChannelOption.SO_KEEPALIVE, true) // 启用TCP keepalive
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区大小
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 发送缓冲区大小
// 客户端性能调优
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时
.option(ChannelOption.SO_KEEPALIVE, true) // keepalive
.option(ChannelOption.TCP_NODELAY, true) // 立即发送
.channel(NioSocketChannel.class);
11.4.2 线程池配置
# application.yml
spring:
task:
execution:
pool:
core-size: 8
max-size: 16
queue-capacity: 100
thread-name-prefix: "detection-"
11.5 监控和诊断
11.5.1 健康检查配置
@Component
public class SocketHealthIndicator implements HealthIndicator {
@Autowired
private SocketManager socketManager;
@Override
public Health health() {
// 检查Socket连接状态
if (hasActiveConnections()) {
return Health.up()
.withDetail("activeConnections", getActiveConnectionCount())
.build();
} else {
return Health.down()
.withDetail("reason", "No active socket connections")
.build();
}
}
}
11.5.2 指标监控
@Component
public class SocketMetrics {
private final MeterRegistry meterRegistry;
private final Counter connectionCounter;
private final Timer messageProcessingTimer;
public SocketMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.connectionCounter = Counter.builder("socket.connections.total")
.description("Total socket connections")
.register(meterRegistry);
this.messageProcessingTimer = Timer.builder("socket.message.processing.time")
.description("Message processing time")
.register(meterRegistry);
}
}
总结
CN_Gather Detection模块的全新Netty通信架构通过智能Socket管理机制和全面Spring组件化的设计,实现了电能质量设备检测系统的现代化通信解决方案。
核心架构优势
-
智能化程度高
- 自动连接管理,开发者无需关心连接细节
- 配置驱动的连接策略,灵活可控
- 透明化的发送机制,简化业务代码
-
Spring生态集成
- 完全Spring组件化管理,遵循IoC原则
- 统一的配置管理,支持多环境部署
- 完善的依赖注入,松耦合设计
-
代码质量提升
- 移除大量冗余和无用代码
- 统一日志管理,便于调试和监控
- 优化异常处理,提高系统稳定性
-
可维护性增强
- 模块化设计,职责边界清晰
- 配置集中管理,降低维护成本
- 完善的资源管理,避免内存泄漏
-
开发体验优化
- 简化的API设计,降低使用门槛
- 智能化的连接管理,减少样板代码
- 统一的错误处理,提高开发效率
技术特色
- 智能发送机制: 业界领先的自动连接管理技术
- 配置统一管理: 现代化的配置管理模式
- Spring深度集成: 充分利用Spring生态优势
- 高并发支持: 基于Netty NIO的高性能通信
- 完善监控: 全方位的监控和诊断能力
该架构为CN_Gather系统提供了稳定、高效、易维护的通信基础,确保了电能质量检测业务的可靠运行,同时为未来的功能扩展和性能优化奠定了坚实基础。