项目初始化

This commit is contained in:
2026-04-13 11:50:14 +08:00
commit 8de2fdc8a4
163 changed files with 10815 additions and 0 deletions

9
detection/Readme.md Normal file
View File

@@ -0,0 +1,9 @@
#### 简介
设备模块主要包含以下功能:
* 检测计划管理
* 被检设备管理
* 被检设备下监测点管理
* 误差体系管理
* 检测脚本管理
* 检测源管理
* 检测报告管理

38
detection/pom.xml Normal file
View File

@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.njcn.gather</groupId>
<artifactId>CN_Tool</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>detection</artifactId>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>njcn-common</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>spingboot2.3.12</artifactId>
<version>2.3.12</version>
</dependency>
<!-- Key refactor point: detection now retains only transport-foundation
dependencies required by the generic Netty/WebSocket stack. -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,43 @@
package com.njcn.gather.detection.pojo.vo;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
/**
* @author wr
* @description
* @date 2024/12/13 9:09
*/
@Data
public class SocketDataMsg {
/**
* 标识不同业务
*/
private String type = "aaa";
/**
* 请求id确保接收到响应时知晓是针对的哪次请求的应答
*/
@JSONField(ordinal = 1)
private String requestId;
/**
* 源初始化 INIT_GATHER$01 INIT_GATHER采集初始化01 统计采集、02 暂态采集、03 实时采集
*/
@JSONField(ordinal = 2)
private String operateCode;
/**
* 数据体传输前需要将对象、Array等转为String
*/
@JSONField(ordinal = 4)
private String data;
/**
* code码
*/
@JSONField(ordinal = 3)
private Integer code;
}

View File

@@ -0,0 +1,31 @@
package com.njcn.gather.detection.pojo.vo;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
/**
* @author wr
* @description socket 通用发送报文请求
* @date 2024/12/11 15:57
*/
@Data
public class SocketMsg<T> {
/**
* 请求id确保接收到响应时知晓是针对的哪次请求的应答
*/
@JSONField(ordinal = 1)
private String requestId;
/**
* 源初始化 INIT_GATHER$01 INIT_GATHER采集初始化01 统计采集、02 暂态采集、03 实时采集
*/
@JSONField(ordinal = 2)
private String operateCode;
/**
* 数据体传输前需要将对象、Array等转为String
*/
@JSONField(ordinal = 3)
private T data;
}

View File

@@ -0,0 +1,30 @@
package com.njcn.gather.detection.pojo.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Generic WebSocket payload wrapper.
*
* @author chendaofei
* @author hongawen
* @date 2026/04/08
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WebSocketVO<T> {
private String type = "transport";
private String requestId;
private String operateCode;
private Integer code;
private String desc;
private T data;
}

View File

@@ -0,0 +1,32 @@
package com.njcn.gather.detection.util.socket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.njcn.gather.detection.pojo.vo.SocketDataMsg;
/**
* Generic socket message helper retained by the communication foundation.
* Stage 4-B removes detection-specific text assembly helpers and keeps only
* payload parsing and JSON framing methods used by the base transport layer.
*
* @author wr
* @author hongawen
* @date 2026/04/08
*/
public final class MsgUtil {
private MsgUtil() {
}
public static SocketDataMsg socketDataMsg(String textMsg) {
return JSON.parseObject(textMsg, SocketDataMsg.class);
}
public static String toJsonWithNewLine(Object obj) {
return JSON.toJSONString(obj, SerializerFeature.PrettyFormat) + "\n";
}
public static String toJsonWithNewLinePlain(Object obj) {
return JSON.toJSONString(obj) + "\n";
}
}

View File

@@ -0,0 +1,170 @@
package com.njcn.gather.detection.util.socket;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionType;
import com.njcn.gather.detection.util.socket.config.SocketConnectionConfig;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* Generic socket session manager.
* Stage 4-A removes detection-only caches from this class and keeps only the
* retained transport responsibilities: session registry, auto-connect and
* outbound dispatch.
*
* @author wr
* @author hongawen
* @date 2026/04/07
*/
@Slf4j
@Component
public class SocketManager {
@Resource
private SocketConnectionConfig socketConnectionConfig;
@Resource
private NettyClient nettyClient;
/**
* Key: sessionKey(userId + connection tag), value: active channel.
*/
private static final Map<String, Channel> SOCKET_SESSIONS = new ConcurrentHashMap<>();
/**
* Key: sessionKey(userId + connection tag), value: event loop group.
*/
private static final Map<String, NioEventLoopGroup> SOCKET_GROUPS = new ConcurrentHashMap<>();
public static void addUser(String sessionKey, Channel channel) {
SOCKET_SESSIONS.put(sessionKey, channel);
}
public static void addGroup(String sessionKey, NioEventLoopGroup group) {
SOCKET_GROUPS.put(sessionKey, group);
}
public static void removeUser(String sessionKey) {
Channel channel = SOCKET_SESSIONS.remove(sessionKey);
NioEventLoopGroup eventLoopGroup = SOCKET_GROUPS.remove(sessionKey);
if (ObjectUtil.isNotNull(channel)) {
try {
channel.close().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Close socket channel interrupted: sessionKey={}", sessionKey, e);
}
}
if (ObjectUtil.isNotNull(eventLoopGroup)) {
eventLoopGroup.shutdownGracefully();
log.info("Socket connection closed: sessionKey={}", sessionKey);
}
}
public static Channel getChannelByUserId(String sessionKey) {
return SOCKET_SESSIONS.get(sessionKey);
}
public static NioEventLoopGroup getGroupByUserId(String sessionKey) {
return SOCKET_GROUPS.get(sessionKey);
}
public static boolean isChannelActive(String sessionKey) {
Channel channel = getChannelByUserId(sessionKey);
return ObjectUtil.isNotNull(channel) && channel.isActive();
}
public static void sendMsg(String sessionKey, String msg) {
Channel channel = SOCKET_SESSIONS.get(sessionKey);
if (ObjectUtil.isNull(channel)) {
log.warn("Send socket message failed because channel does not exist: sessionKey={}, message={}",
sessionKey, msg);
return;
}
channel.writeAndFlush(msg + '\n');
log.info("{}__{} -> {} : {}", sessionKey, channel.id(), channel.remoteAddress(), msg);
}
/**
* Key refactor point: auto-connect now depends only on connection context
* and transport callbacks attached to that context.
*/
public void smartSend(ConnectionContext context, String msg) {
if (ObjectUtil.isNull(context) || ObjectUtil.isNull(context.getConnectionType())) {
log.warn("smartSend skipped because connection context is null");
return;
}
String sessionKey = context.getSessionKey();
String requestId = extractRequestId(msg);
if (StrUtil.isBlank(sessionKey)) {
log.warn("smartSend skipped because sessionKey is blank, requestId={}", requestId);
return;
}
if (needsAutoConnect(context.getConnectionType(), requestId) && !isChannelActive(sessionKey)) {
String ip = resolveIp(context.getConnectionType());
Integer port = resolvePort(context.getConnectionType());
log.info("Socket auto connect triggered: type={}, sessionKey={}, requestId={}",
context.getConnectionType(), sessionKey, requestId);
CompletableFuture.runAsync(() -> nettyClient.connect(ip, port, context, msg));
return;
}
sendMsg(sessionKey, msg);
}
private static String extractRequestId(String msg) {
try {
if (StrUtil.isBlank(msg)) {
return "unknown";
}
JSONObject jsonObject = JSON.parseObject(msg);
String requestId = jsonObject.getString("requestId");
if (StrUtil.isNotBlank(requestId)) {
return requestId;
}
requestId = jsonObject.getString("request_id");
if (StrUtil.isNotBlank(requestId)) {
return requestId;
}
} catch (Exception e) {
log.debug("Extract requestId from socket message failed: {}", msg, e);
}
return "unknown";
}
private boolean needsAutoConnect(ConnectionType connectionType, String requestId) {
if (connectionType == ConnectionType.SOURCE) {
return SocketConnectionConfig.needsSourceConnection(requestId);
}
return SocketConnectionConfig.needsDeviceConnection(requestId);
}
private String resolveIp(ConnectionType connectionType) {
if (connectionType == ConnectionType.SOURCE) {
return socketConnectionConfig.getSource().getIp();
}
return socketConnectionConfig.getDevice().getIp();
}
private Integer resolvePort(ConnectionType connectionType) {
if (connectionType == ConnectionType.SOURCE) {
return socketConnectionConfig.getSource().getPort();
}
return socketConnectionConfig.getDevice().getPort();
}
}

View File

@@ -0,0 +1,103 @@
package com.njcn.gather.detection.util.socket.cilent;
import cn.hutool.core.util.StrUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.communication.handler.ConnectionLifecycleHandler;
import com.njcn.gather.detection.util.socket.communication.handler.SocketMessageHandler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* Shared client handler skeleton for the retained Netty communication layer.
* Stage 4-A centralizes common session registration, message delegation and
* idle cleanup so concrete handlers stay transport-oriented.
*
* @author hongawen
* @date 2026/04/07
*/
@Slf4j
public abstract class AbstractNettyClientHandler extends SimpleChannelInboundHandler<String> {
private final String handlerName;
protected final ConnectionContext connectionContext;
private final SocketMessageHandler socketMessageHandler;
private final ConnectionLifecycleHandler lifecycleHandler;
protected AbstractNettyClientHandler(String handlerName, ConnectionContext connectionContext,
SocketMessageHandler socketMessageHandler,
ConnectionLifecycleHandler lifecycleHandler) {
this.handlerName = handlerName;
this.connectionContext = connectionContext;
this.socketMessageHandler = socketMessageHandler;
this.lifecycleHandler = lifecycleHandler == null ? ConnectionLifecycleHandler.NO_OP : lifecycleHandler;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String sessionKey = resolveSessionKey();
log.info("{} channel active: channelId={}, sessionKey={}", handlerName, ctx.channel().id(), sessionKey);
if (StrUtil.isNotBlank(sessionKey)) {
SocketManager.addUser(sessionKey, ctx.channel());
} else {
log.warn("{} channel active without sessionKey, skip registration", handlerName);
}
lifecycleHandler.onConnected(connectionContext);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String sessionKey = resolveSessionKey();
log.warn("{} channel inactive: channelId={}, sessionKey={}", handlerName, ctx.channel().id(), sessionKey);
if (StrUtil.isNotBlank(sessionKey)) {
SocketManager.removeUser(sessionKey);
}
lifecycleHandler.onDisconnected(connectionContext);
super.channelInactive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if (socketMessageHandler == null) {
log.warn("{} receive message but handler is null: sessionKey={}, message={}",
handlerName, resolveSessionKey(), msg);
return;
}
try {
socketMessageHandler.handle(connectionContext, msg);
} catch (Exception e) {
lifecycleHandler.onMessageHandlingError(connectionContext, msg, e);
throw e;
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
log.warn("{} trigger reader idle timeout: sessionKey={}", handlerName, resolveSessionKey());
lifecycleHandler.onIdleTimeout(connectionContext);
ctx.close();
return;
}
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("{} catch transport exception: sessionKey={}, message={}",
handlerName, resolveSessionKey(), cause.getMessage(), cause);
lifecycleHandler.onException(connectionContext, cause);
ctx.close();
}
protected String resolveSessionKey() {
return connectionContext == null ? null : connectionContext.getSessionKey();
}
}

View File

@@ -0,0 +1,138 @@
package com.njcn.gather.detection.util.socket.cilent;
import cn.hutool.core.util.StrUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.communication.handler.ConnectionLifecycleHandler;
import com.njcn.gather.detection.util.socket.communication.handler.HeartbeatMessageStrategy;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Generic Netty heartbeat handler.
* Stage 4-A moves heartbeat framing behind a strategy interface so the
* retained transport layer can keep heartbeat capability without embedding
* detection-specific packet structures.
*
* @author cdf
* @author hongawen
* @date 2026/04/07
*/
@Slf4j
public class HeartbeatHandler extends SimpleChannelInboundHandler<String> {
private static final int MAX_HEARTBEAT_MISSES = 3;
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1);
private final ConnectionContext connectionContext;
private final HeartbeatMessageStrategy heartbeatMessageStrategy;
private final ConnectionLifecycleHandler lifecycleHandler;
private ScheduledFuture<?> heartbeatFuture;
private int consecutiveHeartbeatMisses;
public HeartbeatHandler(ConnectionContext connectionContext, HeartbeatMessageStrategy heartbeatMessageStrategy,
ConnectionLifecycleHandler lifecycleHandler) {
this.connectionContext = connectionContext;
this.heartbeatMessageStrategy = heartbeatMessageStrategy;
this.lifecycleHandler = lifecycleHandler == null ? ConnectionLifecycleHandler.NO_OP : lifecycleHandler;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
scheduleHeartbeat(ctx);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
shutdownExecutorGracefully();
super.channelInactive(ctx);
}
private void scheduleHeartbeat(ChannelHandlerContext ctx) {
if (heartbeatMessageStrategy == null) {
log.debug("Skip heartbeat scheduling because strategy is null: sessionKey={}", resolveSessionKey());
return;
}
heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(() -> {
if (!ctx.channel().isActive()) {
return;
}
try {
String heartbeatMessage = heartbeatMessageStrategy.buildHeartbeatMessage(connectionContext);
if (StrUtil.isBlank(heartbeatMessage)) {
return;
}
// The client pipeline still uses line based frames, so the
// generic heartbeat writer normalizes the trailing separator.
if (!heartbeatMessage.endsWith("\n")) {
heartbeatMessage = heartbeatMessage + "\n";
}
ctx.channel().writeAndFlush(heartbeatMessage);
consecutiveHeartbeatMisses++;
log.debug("Send heartbeat packet: sessionKey={}, time={}, misses={}",
resolveSessionKey(), LocalDateTime.now(), consecutiveHeartbeatMisses);
if (consecutiveHeartbeatMisses >= MAX_HEARTBEAT_MISSES) {
handleHeartbeatTimeout(ctx);
}
} catch (Exception e) {
log.error("Send heartbeat packet failed: sessionKey={}", resolveSessionKey(), e);
}
}, 3, 10, TimeUnit.SECONDS);
}
private void handleHeartbeatTimeout(ChannelHandlerContext ctx) {
log.warn("Heartbeat timeout reached: sessionKey={}, misses={}",
resolveSessionKey(), consecutiveHeartbeatMisses);
lifecycleHandler.onIdleTimeout(connectionContext);
String sessionKey = resolveSessionKey();
if (StrUtil.isNotBlank(sessionKey)) {
SocketManager.removeUser(sessionKey);
}
consecutiveHeartbeatMisses = 0;
if (ctx.channel().isActive()) {
ctx.close();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if (heartbeatMessageStrategy != null && heartbeatMessageStrategy.isHeartbeatResponse(connectionContext, msg)) {
consecutiveHeartbeatMisses = 0;
log.debug("Receive heartbeat response: sessionKey={}, time={}", resolveSessionKey(), LocalDateTime.now());
return;
}
ctx.fireChannelRead(msg);
}
private String resolveSessionKey() {
return connectionContext == null ? null : connectionContext.getSessionKey();
}
private void shutdownExecutorGracefully() {
try {
if (heartbeatFuture != null && !heartbeatFuture.isCancelled()) {
heartbeatFuture.cancel(false);
}
heartbeatExecutor.shutdown();
if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
heartbeatExecutor.shutdownNow();
}
} catch (InterruptedException e) {
heartbeatExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

View File

@@ -0,0 +1,154 @@
package com.njcn.gather.detection.util.socket.cilent;
import cn.hutool.core.util.ObjectUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.communication.handler.ConnectionLifecycleHandler;
import com.njcn.gather.detection.util.socket.communication.handler.HeartbeatMessageStrategy;
import com.njcn.gather.detection.util.socket.communication.handler.SocketMessageHandler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionType;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* Generic Netty client entry.
* Stage 4-A removes direct dependencies on detection handlers and services.
* Message parsing, heartbeat framing and lifecycle side effects now come from
* callbacks attached to {@link ConnectionContext}.
*
* @author wr
* @author hongawen
* @date 2026/04/07
*/
@Slf4j
@Component
public class NettyClient {
public void connect(String ip, Integer port, ConnectionContext context, String msg) {
if (ObjectUtil.isNull(context) || ObjectUtil.isNull(context.getConnectionType())) {
log.warn("Skip socket connect because connection context is null");
return;
}
SocketMessageHandler messageHandler = context.getMessageHandler();
ConnectionLifecycleHandler lifecycleHandler = resolveLifecycleHandler(context);
HeartbeatMessageStrategy heartbeatMessageStrategy = context.getHeartbeatStrategy();
SimpleChannelInboundHandler<String> handler = createHandler(context, messageHandler, lifecycleHandler);
executeSocketConnection(ip, port, context, msg, handler, lifecycleHandler, heartbeatMessageStrategy);
}
private SimpleChannelInboundHandler<String> createHandler(ConnectionContext context,
SocketMessageHandler messageHandler,
ConnectionLifecycleHandler lifecycleHandler) {
if (context.getConnectionType() == ConnectionType.SOURCE) {
return new NettySourceClientHandler(context, messageHandler, lifecycleHandler);
}
if (context.getConnectionType() == ConnectionType.DEVICE) {
return new NettyDevClientHandler(context, messageHandler, lifecycleHandler);
}
return new NettyContrastClientHandler(context, messageHandler, lifecycleHandler);
}
private ConnectionLifecycleHandler resolveLifecycleHandler(ConnectionContext context) {
ConnectionLifecycleHandler lifecycleHandler = context.getLifecycleHandler();
return lifecycleHandler == null ? ConnectionLifecycleHandler.NO_OP : lifecycleHandler;
}
private void executeSocketConnection(String ip, Integer port, ConnectionContext context, String msg,
SimpleChannelInboundHandler<String> handler,
ConnectionLifecycleHandler lifecycleHandler,
HeartbeatMessageStrategy heartbeatMessageStrategy) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel.class)
.handler(createChannelInitializer(context, handler, lifecycleHandler, heartbeatMessageStrategy));
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
handleConnectionResult(channelFuture, context, group, msg, lifecycleHandler);
} catch (Exception e) {
log.warn("Connect socket server error: type={}, sessionKey={}",
context.getConnectionType(), context.getSessionKey(), e);
group.shutdownGracefully();
lifecycleHandler.onConnectFailed(context, e);
}
}
private ChannelInitializer<NioSocketChannel> createChannelInitializer(ConnectionContext context,
SimpleChannelInboundHandler<String> handler,
ConnectionLifecycleHandler lifecycleHandler,
HeartbeatMessageStrategy heartbeatMessageStrategy) {
return new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
setupPipeline(ch.pipeline(), context, handler, lifecycleHandler, heartbeatMessageStrategy);
}
};
}
/**
* Key refactor point: pipeline extension now comes from generic strategy
* and lifecycle callbacks instead of fixed detection business classes.
*/
private void setupPipeline(ChannelPipeline pipeline, ConnectionContext context,
SimpleChannelInboundHandler<String> handler,
ConnectionLifecycleHandler lifecycleHandler,
HeartbeatMessageStrategy heartbeatMessageStrategy) {
pipeline.addLast(new LineBasedFrameDecoder(10240 * 2))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new HeartbeatHandler(context, heartbeatMessageStrategy, lifecycleHandler));
if (context.getConnectionType().isEnableIdleMonitor()) {
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
}
pipeline.addLast(handler);
}
private void handleConnectionResult(ChannelFuture channelFuture, ConnectionContext context,
NioEventLoopGroup group, String msg,
ConnectionLifecycleHandler lifecycleHandler) {
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
log.warn("Connect socket server failed: type={}, sessionKey={}",
context.getConnectionType(), context.getSessionKey(), future.cause());
group.shutdownGracefully();
lifecycleHandler.onConnectFailed(context, future.cause());
return;
}
log.info("Connect socket server success: type={}, sessionKey={}",
context.getConnectionType(), context.getSessionKey());
manageSocketConnection(context, group);
SocketManager.addUser(context.getSessionKey(), future.channel());
SocketManager.sendMsg(context.getSessionKey(), msg);
});
}
private void manageSocketConnection(ConnectionContext context, NioEventLoopGroup group) {
String sessionKey = context.getSessionKey();
NioEventLoopGroup existingGroup = SocketManager.getGroupByUserId(sessionKey);
if (ObjectUtil.isNotNull(existingGroup)) {
try {
existingGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
SocketManager.addGroup(sessionKey, group);
}
}

View File

@@ -0,0 +1,20 @@
package com.njcn.gather.detection.util.socket.cilent;
import com.njcn.gather.detection.util.socket.communication.handler.ConnectionLifecycleHandler;
import com.njcn.gather.detection.util.socket.communication.handler.SocketMessageHandler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
/**
* Contrast device client transport handler.
*
* @author caozehui
* @author hongawen
* @date 2026/04/07
*/
public class NettyContrastClientHandler extends AbstractNettyClientHandler {
public NettyContrastClientHandler(ConnectionContext connectionContext, SocketMessageHandler socketMessageHandler,
ConnectionLifecycleHandler lifecycleHandler) {
super("contrast-device-client", connectionContext, socketMessageHandler, lifecycleHandler);
}
}

View File

@@ -0,0 +1,20 @@
package com.njcn.gather.detection.util.socket.cilent;
import com.njcn.gather.detection.util.socket.communication.handler.ConnectionLifecycleHandler;
import com.njcn.gather.detection.util.socket.communication.handler.SocketMessageHandler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
/**
* Device client transport handler.
*
* @author wr
* @author hongawen
* @date 2026/04/07
*/
public class NettyDevClientHandler extends AbstractNettyClientHandler {
public NettyDevClientHandler(ConnectionContext connectionContext, SocketMessageHandler socketMessageHandler,
ConnectionLifecycleHandler lifecycleHandler) {
super("device-client", connectionContext, socketMessageHandler, lifecycleHandler);
}
}

View File

@@ -0,0 +1,20 @@
package com.njcn.gather.detection.util.socket.cilent;
import com.njcn.gather.detection.util.socket.communication.handler.ConnectionLifecycleHandler;
import com.njcn.gather.detection.util.socket.communication.handler.SocketMessageHandler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
/**
* Source client transport handler.
*
* @author wr
* @author hongawen
* @date 2026/04/07
*/
public class NettySourceClientHandler extends AbstractNettyClientHandler {
public NettySourceClientHandler(ConnectionContext connectionContext, SocketMessageHandler socketMessageHandler,
ConnectionLifecycleHandler lifecycleHandler) {
super("source-client", connectionContext, socketMessageHandler, lifecycleHandler);
}
}

View File

@@ -0,0 +1,21 @@
package com.njcn.gather.detection.util.socket.communication.constants;
/**
* Socket transport constants kept by the communication foundation.
* Stage 4-A extracts these values from detection-only helpers so the
* retained transport layer does not depend on business utility classes.
*
* @author hongawen
* @date 2026/04/07
*/
public final class SocketTransportConstants {
public static final String SOURCE_SESSION_TAG = "_Source";
public static final String DEVICE_SESSION_TAG = "_Dev";
public static final String CONTRAST_SESSION_TAG = "_Contrast_Dev";
private SocketTransportConstants() {
}
}

View File

@@ -0,0 +1,35 @@
package com.njcn.gather.detection.util.socket.communication.handler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
/**
* Connection lifecycle callback for the retained communication foundation.
* Business modules can attach optional callbacks here without leaking their
* own service types into Netty and WebSocket infrastructure.
*
* @author hongawen
* @date 2026/04/07
*/
public interface ConnectionLifecycleHandler {
ConnectionLifecycleHandler NO_OP = new ConnectionLifecycleHandler() {
};
default void onConnected(ConnectionContext context) {
}
default void onDisconnected(ConnectionContext context) {
}
default void onConnectFailed(ConnectionContext context, Throwable cause) {
}
default void onIdleTimeout(ConnectionContext context) {
}
default void onMessageHandlingError(ConnectionContext context, String message, Throwable cause) {
}
default void onException(ConnectionContext context, Throwable cause) {
}
}

View File

@@ -0,0 +1,25 @@
package com.njcn.gather.detection.util.socket.communication.handler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
/**
* Heartbeat protocol strategy for generic client connections.
* The transport layer only knows when to send and detect heartbeat frames;
* the concrete heartbeat payload is provided by the business side.
*
* @author hongawen
* @date 2026/04/07
*/
public interface HeartbeatMessageStrategy {
/**
* Build the outbound heartbeat packet. Return {@code null} or blank to
* disable heartbeat sending for the current connection.
*/
String buildHeartbeatMessage(ConnectionContext context);
/**
* Check whether the inbound message is a heartbeat response frame.
*/
boolean isHeartbeatResponse(ConnectionContext context, String message);
}

View File

@@ -0,0 +1,23 @@
package com.njcn.gather.detection.util.socket.communication.handler;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionContext;
/**
* Socket 消息处理接口。
* 第一阶段先把消息回调从具体业务 Service 中抽离成统一入口,后续可以继续沉淀为独立通讯模块。
*
* @author hongawen
* @date 2026/04/07
*/
@FunctionalInterface
public interface SocketMessageHandler {
/**
* 处理收到的 Socket 消息。
*
* @param context 连接上下文
* @param message 文本消息
* @throws Exception 处理异常
*/
void handle(ConnectionContext context, String message) throws Exception;
}

View File

@@ -0,0 +1,111 @@
package com.njcn.gather.detection.util.socket.communication.model;
import cn.hutool.core.util.StrUtil;
import com.njcn.gather.detection.util.socket.communication.handler.ConnectionLifecycleHandler;
import com.njcn.gather.detection.util.socket.communication.handler.HeartbeatMessageStrategy;
import com.njcn.gather.detection.util.socket.communication.handler.SocketMessageHandler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Generic communication connection context.
* Stage 4-A keeps transport identity and optional callbacks in one place so
* Netty client/server code does not need to know detection business types.
*
* @author hongawen
* @date 2026/04/07
*/
public class ConnectionContext {
public static final String ATTR_PRE_DETECTION_PARAM = "preDetectionParam";
public static final String ATTR_CONTRAST_PARAM = "contrastDetectionParam";
public static final String ATTR_SOCKET_MESSAGE_HANDLER = "socketMessageHandler";
public static final String ATTR_CONNECTION_LIFECYCLE_HANDLER = "connectionLifecycleHandler";
public static final String ATTR_HEARTBEAT_MESSAGE_STRATEGY = "heartbeatMessageStrategy";
private final String userId;
private final ConnectionType connectionType;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
public ConnectionContext(String userId, ConnectionType connectionType) {
this.userId = userId;
this.connectionType = connectionType;
}
public static ConnectionContext of(String userId, ConnectionType connectionType) {
return new ConnectionContext(userId, connectionType);
}
public String getUserId() {
return userId;
}
public ConnectionType getConnectionType() {
return connectionType;
}
/**
* Key refactor point: the transport foundation now resolves the session
* key from one place instead of reassembling it across multiple classes.
*/
public String getSessionKey() {
if (StrUtil.isBlank(userId) || connectionType == null) {
return userId;
}
return userId + connectionType.getSessionTag();
}
public ConnectionContext addAttribute(String key, Object value) {
if (StrUtil.isNotBlank(key) && value != null) {
attributes.put(key, value);
}
return this;
}
public ConnectionContext addMessageHandler(SocketMessageHandler handler) {
return addAttribute(ATTR_SOCKET_MESSAGE_HANDLER, handler);
}
public SocketMessageHandler getMessageHandler() {
return getAttribute(ATTR_SOCKET_MESSAGE_HANDLER, SocketMessageHandler.class);
}
public ConnectionContext addLifecycleHandler(ConnectionLifecycleHandler lifecycleHandler) {
return addAttribute(ATTR_CONNECTION_LIFECYCLE_HANDLER, lifecycleHandler);
}
public ConnectionLifecycleHandler getLifecycleHandler() {
return getAttribute(ATTR_CONNECTION_LIFECYCLE_HANDLER, ConnectionLifecycleHandler.class);
}
public ConnectionContext addHeartbeatStrategy(HeartbeatMessageStrategy heartbeatMessageStrategy) {
return addAttribute(ATTR_HEARTBEAT_MESSAGE_STRATEGY, heartbeatMessageStrategy);
}
public HeartbeatMessageStrategy getHeartbeatStrategy() {
return getAttribute(ATTR_HEARTBEAT_MESSAGE_STRATEGY, HeartbeatMessageStrategy.class);
}
public Object getAttribute(String key) {
return attributes.get(key);
}
public <T> T getAttribute(String key, Class<T> type) {
Object value = attributes.get(key);
if (type.isInstance(value)) {
return type.cast(value);
}
return null;
}
public Map<String, Object> getAttributes() {
return attributes;
}
}

View File

@@ -0,0 +1,42 @@
package com.njcn.gather.detection.util.socket.communication.model;
import com.njcn.gather.detection.util.socket.communication.constants.SocketTransportConstants;
/**
* Communication connection types retained by the transport foundation.
* Stage 4-A keeps the session tags in a neutral constants class so the
* Netty/WebSocket base layer no longer depends on detection helpers.
*
* @author hongawen
* @date 2026/04/07
*/
public enum ConnectionType {
SOURCE(SocketTransportConstants.SOURCE_SESSION_TAG, "程控源", false),
DEVICE(SocketTransportConstants.DEVICE_SESSION_TAG, "被检设备", true),
CONTRAST(SocketTransportConstants.CONTRAST_SESSION_TAG, "比对被检设备", true);
private final String sessionTag;
private final String displayName;
private final boolean enableIdleMonitor;
ConnectionType(String sessionTag, String displayName, boolean enableIdleMonitor) {
this.sessionTag = sessionTag;
this.displayName = displayName;
this.enableIdleMonitor = enableIdleMonitor;
}
public String getSessionTag() {
return sessionTag;
}
public String getDisplayName() {
return displayName;
}
public boolean isEnableIdleMonitor() {
return enableIdleMonitor;
}
}

View File

@@ -0,0 +1,169 @@
package com.njcn.gather.detection.util.socket.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* Socket连接配置管理类
* 定义哪些requestId需要建立通道连接以及IP/PORT配置
*
* @Author: hongawen
* @Date: 2024/12/10
*/
@Component
@ConfigurationProperties(prefix = "socket")
public class SocketConnectionConfig {
/**
* 程控源设备配置
*/
private SourceConfig source = new SourceConfig();
/**
* 被检设备配置
*/
private DeviceConfig device = new DeviceConfig();
@Data
public static class SourceConfig {
/**
* 程控源IP地址
*/
private String ip;
/**
* 程控源端口号
*/
private Integer port;
}
@Data
public static class DeviceConfig {
/**
* 被检设备IP地址
*/
private String ip;
/**
* 被检设备端口号
*/
private Integer port;
}
/**
* 获取程控源配置
*/
public SourceConfig getSource() {
return source;
}
/**
* 获取被检设备配置
*/
public DeviceConfig getDevice() {
return device;
}
/**
* 需要建立程控源通道的requestId集合
* 这些requestId在发送消息时如果程控源通道不存在会自动建立连接
*/
private static final Set<String> SOURCE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
// 源通讯检测
"yjc_ytxjy"
// 可以根据实际业务需求添加更多requestId
));
/**
* 需要建立被检设备通道的requestId集合
* 这些requestId在发送消息时如果被检设备通道不存在会自动建立连接
*/
private static final Set<String> DEVICE_CONNECTION_REQUEST_IDS = new HashSet<>(Arrays.asList(
// 连接建立
"yjc_sbtxjy",
// ftp文件传送指令
"FTP_SEND$01"
// 可以根据实际业务需求添加更多requestId
));
/**
* 检查指定的requestId是否需要建立程控源连接
*
* @param requestId 请求ID
* @return boolean true:需要建立连接, false:不需要建立连接
*/
public static boolean needsSourceConnection(String requestId) {
return SOURCE_CONNECTION_REQUEST_IDS.contains(requestId);
}
/**
* 检查指定的requestId是否需要建立被检设备连接
*
* @param requestId 请求ID
* @return boolean true:需要建立连接, false:不需要建立连接
*/
public static boolean needsDeviceConnection(String requestId) {
return DEVICE_CONNECTION_REQUEST_IDS.contains(requestId);
}
/**
* 添加需要建立程控源连接的requestId
* 支持运行时动态添加
*
* @param requestId 请求ID
*/
public static void addSourceConnectionRequestId(String requestId) {
SOURCE_CONNECTION_REQUEST_IDS.add(requestId);
}
/**
* 添加需要建立被检设备连接的requestId
* 支持运行时动态添加
*
* @param requestId 请求ID
*/
public static void addDeviceConnectionRequestId(String requestId) {
DEVICE_CONNECTION_REQUEST_IDS.add(requestId);
}
/**
* 移除程控源连接requestId
*
* @param requestId 请求ID
*/
public static void removeSourceConnectionRequestId(String requestId) {
SOURCE_CONNECTION_REQUEST_IDS.remove(requestId);
}
/**
* 移除被检设备连接requestId
*
* @param requestId 请求ID
*/
public static void removeDeviceConnectionRequestId(String requestId) {
DEVICE_CONNECTION_REQUEST_IDS.remove(requestId);
}
/**
* 获取所有需要建立程控源连接的requestId集合只读
*
* @return Set<String> requestId集合
*/
public static Set<String> getSourceConnectionRequestIds() {
return new HashSet<>(SOURCE_CONNECTION_REQUEST_IDS);
}
/**
* 获取所有需要建立被检设备连接的requestId集合只读
*
* @return Set<String> requestId集合
*/
public static Set<String> getDeviceConnectionRequestIds() {
return new HashSet<>(DEVICE_CONNECTION_REQUEST_IDS);
}
}

View File

@@ -0,0 +1,87 @@
package com.njcn.gather.detection.util.socket.websocket;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Generic WebSocket session manager.
* Stage 4-A removes detection payload conventions and detection parameter
* caches from this class so it remains a pure WebSocket session registry.
*
* @author wr
* @author hongawen
* @date 2026/04/07
*/
@Slf4j
public class WebServiceManager {
private static final Map<String, Channel> USER_SESSIONS = new ConcurrentHashMap<>();
private WebServiceManager() {
}
public static void addUser(String userId, Channel channel) {
USER_SESSIONS.put(userId, channel);
}
public static Channel removeByUserId(String userId) {
return USER_SESSIONS.remove(userId);
}
@Deprecated
public static void removeChannel(String channelId) {
Iterator<Map.Entry<String, Channel>> iterator = USER_SESSIONS.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Channel> entry = iterator.next();
if (entry.getValue().id().toString().equals(channelId)) {
iterator.remove();
break;
}
}
}
public static void sendMsg(String userId, String msg) {
Channel channel = USER_SESSIONS.get(userId);
if (Objects.nonNull(channel) && channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
return;
}
log.error("WebSocket push failed because session is offline, time={}, userId={}",
LocalDateTime.now(), userId);
WebSocketHandler.cleanupSocketResources(userId);
}
public static void sendJson(String userId, Object payload) {
Channel channel = USER_SESSIONS.get(userId);
if (Objects.nonNull(channel) && channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(payload)));
return;
}
log.error("WebSocket json push failed because session is offline, time={}, userId={}",
LocalDateTime.now(), userId);
WebSocketHandler.cleanupSocketResources(userId);
}
public static int getOnlineUserCount() {
return USER_SESSIONS.size();
}
public static boolean isUserOnline(String userId) {
Channel channel = USER_SESSIONS.get(userId);
return Objects.nonNull(channel) && channel.isActive();
}
public static Set<String> getOnlineUserIds() {
return new HashSet<>(USER_SESSIONS.keySet());
}
}

View File

@@ -0,0 +1,49 @@
package com.njcn.gather.detection.util.socket.websocket;
/**
* WebSocket常量管理类
*
* @author wr
* @date 2024/12/10
*/
public final class WebSocketConstants {
/**
* URL参数分隔符
*/
public static final String QUESTION_MARK = "?";
/**
* URL参数等号分隔符
*/
public static final String EQUAL_TO = "=";
/**
* 客户端心跳消息
*/
public static final String HEARTBEAT_PING = "alive";
/**
* 服务端心跳响应
*/
public static final String HEARTBEAT_PONG = "over";
/**
* 心跳超时最大次数
*/
public static final int MAX_HEARTBEAT_MISS_COUNT = 3;
/**
* WebSocket握手失败状态码
*/
public static final int HANDSHAKE_FAILED_STATUS = 4000;
/**
* WebSocket握手失败原因
*/
public static final String HANDSHAKE_FAILED_REASON = "Missing required userId parameter";
private WebSocketConstants() {
// 私有构造函数,防止实例化
}
}

View File

@@ -0,0 +1,154 @@
package com.njcn.gather.detection.util.socket.websocket;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.communication.model.ConnectionType;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.njcn.gather.detection.util.socket.websocket.WebSocketConstants.HEARTBEAT_PING;
import static com.njcn.gather.detection.util.socket.websocket.WebSocketConstants.HEARTBEAT_PONG;
import static com.njcn.gather.detection.util.socket.websocket.WebSocketConstants.MAX_HEARTBEAT_MISS_COUNT;
/**
* Generic WebSocket handler retained by the communication foundation.
* Stage 4-A keeps only handshake, heartbeat, session registry and transport
* cleanup. Detection-specific quit flows are removed from this class.
*
* @author wr
* @author hongawen
* @date 2026/04/07
*/
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final String HEARTBEAT_RESPONSE_TEXT = HEARTBEAT_PONG;
private int times;
private String userId;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("WebSocket channel active: channelId={}", ctx.channel().id());
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
String messageText = msg.text();
if (HEARTBEAT_PING.equals(messageText)) {
handleHeartbeat(ctx);
return;
}
log.debug("Receive WebSocket business message: userId={}, channelId={}, message={}",
userId, ctx.channel().id(), messageText);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("WebSocket handler added: channelId={}", ctx.channel().id());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("WebSocket handler removed: channelId={}, userId={}", ctx.channel().id(), userId);
if (userId != null) {
WebServiceManager.removeByUserId(userId);
} else {
WebServiceManager.removeChannel(ctx.channel().id().toString());
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("WebSocket channel inactive: channelId={}, userId={}", ctx.channel().id(), userId);
cleanupSocketResources(userId);
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete =
(WebSocketServerProtocolHandler.HandshakeComplete) evt;
userId = ctx.channel().attr(AttributeKey.<String>valueOf("userId")).get();
log.info("WebSocket handshake complete: userId={}, channelId={}, requestUri={}",
userId, ctx.channel().id(), handshakeComplete.requestUri());
if (userId != null) {
WebServiceManager.addUser(userId, ctx.channel());
}
sendConnectionSuccessMessage(ctx);
return;
}
if (evt instanceof IdleStateEvent) {
times++;
log.warn("WebSocket heartbeat miss: channelId={}, userId={}, missCount={}",
ctx.channel().id(), userId, times);
if (times > MAX_HEARTBEAT_MISS_COUNT) {
cleanupSocketResources(userId);
ctx.close();
}
return;
}
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logExceptionByType(ctx.channel().id().toString(), cause);
cleanupSocketResources(userId);
ctx.close();
}
private void sendConnectionSuccessMessage(ChannelHandlerContext ctx) {
String welcomeMessage = String.format(
"{\"type\":\"connection\",\"status\":\"success\",\"message\":\"WebSocket连接建立成功\",\"userId\":\"%s\",\"timestamp\":%d}",
userId, System.currentTimeMillis());
ctx.channel().writeAndFlush(new TextWebSocketFrame(welcomeMessage));
}
private void handleHeartbeat(ChannelHandlerContext ctx) {
times = 0;
ctx.channel().writeAndFlush(new TextWebSocketFrame(HEARTBEAT_RESPONSE_TEXT));
}
private void logExceptionByType(String channelId, Throwable cause) {
if (cause instanceof IOException) {
log.info("WebSocket network exception: channelId={}, message={}", channelId, cause.getMessage());
} else if (cause instanceof WebSocketHandshakeException) {
log.warn("WebSocket handshake exception: channelId={}, message={}", channelId, cause.getMessage());
} else if (cause instanceof DecoderException || cause instanceof CorruptedFrameException) {
log.error("WebSocket decode exception: channelId={}, message={}", channelId, cause.getMessage(), cause);
} else if (cause instanceof IllegalArgumentException) {
log.warn("WebSocket argument exception: channelId={}, message={}", channelId, cause.getMessage());
} else {
log.error("WebSocket unclassified exception: channelId={}, message={}", channelId, cause.getMessage(), cause);
}
}
/**
* Key refactor point: websocket disconnect now performs generic transport
* cleanup only, which makes this layer independent from detection flows.
*/
public static void cleanupSocketResources(String userId) {
if (userId == null || userId.trim().isEmpty()) {
return;
}
WebServiceManager.removeByUserId(userId);
for (ConnectionType connectionType : ConnectionType.values()) {
SocketManager.removeUser(userId + connectionType.getSessionTag());
}
}
}

View File

@@ -0,0 +1,184 @@
package com.njcn.gather.detection.util.socket.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* WebSocket服务端管道初始化器
*
* 职责:
* 1. 为每个新的WebSocket连接配置处理器链Pipeline
* 2. 按正确顺序添加各种Handler确保数据流正确处理
* 3. 配置HTTP到WebSocket的协议升级
* 4. 设置心跳检测和异常处理机制
*
* 处理流程:
* HTTP请求 → HTTP编解码 → 分块处理 → 消息聚合 → 协议升级 → 心跳检测 → 业务处理 → 异常处理
*
* @Description: webSocket服务端自定义配置
* @Author: wr
* @Date: 2024/12/10 14:20
*/
@Slf4j
public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
/**
* WebSocket访问路径
*/
private static final String WEBSOCKET_PATH = "/hello";
/**
* HTTP消息最大聚合大小512KB
* 用于WebSocket握手和消息传输
*/
private static final int MAX_CONTENT_LENGTH = 512 * 1024;
/**
* 心跳检测间隔13秒
* 13秒内没有收到客户端消息则触发空闲事件
*/
private static final int READER_IDLE_TIME_SECONDS = 13;
/**
* 为每个新连接初始化处理器管道
* 注意Handler的添加顺序非常重要决定了数据的处理流向
*
* @param ch 新建立的Socket通道
* @throws Exception 初始化过程中的异常
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 1. HTTP协议处理器
// HttpServerCodec = HttpRequestDecoder + HttpResponseEncoder
// 负责HTTP请求解码和HTTP响应编码
pipeline.addLast("http-codec", new HttpServerCodec());
// 2. 分块写入处理器
// 用于处理大文件的分块传输,防止内存溢出
// 支持ChunkedInput如ChunkedFile、ChunkedNioFile等
pipeline.addLast("chunked-write", new ChunkedWriteHandler());
// 3. HTTP消息聚合器
// 将分片的HTTP消息重新组装成完整的FullHttpRequest或FullHttpResponse
// WebSocket握手需要完整的HTTP请求所以这个Handler必须添加
pipeline.addLast("http-aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
// 4. WebSocket URL预处理器
// 在WebSocket握手之前处理URL参数验证用户ID
pipeline.addLast("websocket-preprocessor", new WebSocketPreprocessor());
// 5. WebSocket协议升级处理器
// 处理WebSocket握手将HTTP协议升级为WebSocket协议
// 只有访问指定路径(WEBSOCKET_PATH)的请求才会被升级
// 升级后会移除HTTP相关的Handler添加WebSocket相关的Handler
pipeline.addLast("websocket-protocol", new WebSocketServerProtocolHandler(WEBSOCKET_PATH));
// 6. 空闲状态检测器
// 检测连接的空闲状态,用于心跳机制
// readerIdleTime: 读空闲时间writerIdleTime: 写空闲时间allIdleTime: 读写空闲时间
pipeline.addLast("idle-state", new IdleStateHandler(READER_IDLE_TIME_SECONDS, 0, 0, TimeUnit.SECONDS));
// 7. 自定义WebSocket业务处理器
// 处理WebSocket帧实现具体的业务逻辑
// 包括心跳处理、消息路由、连接管理等
pipeline.addLast("websocket-handler", new WebSocketHandler());
// 7. 全局异常处理器
// 处理整个管道中未被捕获的异常,作为最后的异常处理兜底
pipeline.addLast("exception-handler", new GlobalExceptionHandler());
}
/**
* WebSocket预处理器
* 在WebSocket握手之前验证URL参数并清理URL
*/
private static class WebSocketPreprocessor extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
log.debug("WebSocket预处理器收到HTTP请求{}", uri);
// 验证并提取userId
String userId = extractUserId(uri);
if (userId == null || userId.trim().isEmpty()) {
log.warn("WebSocket连接被拒绝缺少userId参数, uri: {}", uri);
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.BAD_REQUEST
);
ctx.writeAndFlush(response).addListener(f -> ctx.close());
return;
}
// 将userId存储到Channel属性中
ctx.channel().attr(AttributeKey.<String>valueOf("userId")).set(userId);
// 清理URL参数
if (uri.contains("?")) {
String cleanUri = uri.substring(0, uri.indexOf("?"));
request.setUri(cleanUri);
log.debug("URL已清理原始: {}, 清理后: {}, userId: {}", uri, cleanUri, userId);
}
}
// 继续传递给下一个Handler
super.channelRead(ctx, msg);
}
private String extractUserId(String uri) {
if (!uri.contains("name=")) {
return null;
}
int start = uri.indexOf("name=") + 5;
int end = uri.indexOf("&", start);
if (end == -1) {
return uri.substring(start);
} else {
return uri.substring(start, end);
}
}
}
/**
* 全局异常处理器
* 作为管道中的最后一个Handler捕获所有未处理的异常
*/
private static class GlobalExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 记录异常详情,便于问题排查
log.error("WebSocket连接发生未处理异常远程地址{},异常信息:{}",
ctx.channel().remoteAddress(), cause.getMessage(), cause);
// 优雅关闭连接
if (ctx.channel().isActive()) {
ctx.close();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("WebSocket连接断开远程地址{}", ctx.channel().remoteAddress());
super.channelInactive(ctx);
}
}
}

View File

@@ -0,0 +1,237 @@
package com.njcn.gather.detection.util.socket.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* WebSocket服务端核心类
*
* 职责:
* 1. 启动基于Netty的WebSocket服务器
* 2. 管理服务器生命周期(启动/关闭)
* 3. 提供高性能的WebSocket通信支持
*
* 特性:
* - 使用ApplicationRunner确保在Spring容器完全启动后再启动WebSocket服务
* - 使用CompletableFuture异步启动避免阻塞Spring Boot主线程
* - 支持优雅关闭,确保资源正确释放
* - 完善的异常处理和日志记录
*
* @Description: websocket服务端
* @Author: wr
* @Date: 2024/12/10 13:59
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketService implements ApplicationRunner {
/**
* WebSocket服务器监听端口
* 默认7777端口可通过配置文件webSocket.port自定义
* 客户端连接地址ws://host:port/hello?name=userId
*/
@Value("${webSocket.port:7777}")
int port;
/**
* Netty Boss线程组
* 专门负责接受新的客户端连接请求
* 通常配置1个线程即可因为接受连接的操作相对简单
*/
EventLoopGroup bossGroup;
/**
* Netty Worker线程组
* 专门负责处理已建立连接的I/O操作和业务逻辑
* 默认线程数 = CPU核心数 * 2用于并发处理多个客户端
*/
EventLoopGroup workerGroup;
/**
* 服务器通道引用
* 保存绑定端口后的Channel用于服务器关闭时释放资源
*/
private Channel serverChannel;
/**
* 异步启动任务的Future对象
* 用于管理WebSocket服务器的异步启动过程
* 可以用来取消启动任务或检查启动状态
*/
private CompletableFuture<Void> serverFuture;
/**
* Spring Boot应用启动完成后自动调用此方法
* 使用ApplicationRunner确保在所有Bean初始化完成后再启动WebSocket服务
*/
@Override
public void run(ApplicationArguments args){
// 使用CompletableFuture异步启动WebSocket服务避免阻塞Spring Boot主线程
// 这样可以让应用快速启动完成WebSocket服务在后台异步启动
serverFuture = CompletableFuture.runAsync(this::startWebSocketServer)
.exceptionally(throwable -> {
// 如果启动过程中发生异常,记录日志但不影响应用启动
log.error("WebSocket服务启动异常", throwable);
return null;
});
}
/**
* 启动WebSocket服务器的核心方法
* 此方法会一直阻塞直到服务器关闭,所以需要在异步线程中执行
*/
private void startWebSocketServer() {
try {
// 1. 创建线程组
// bossGroup: 专门负责接受新的客户端连接请求
// 可以自定义线程的数量这里使用默认值通常为1个线程
bossGroup = new NioEventLoopGroup(1);
// workerGroup: 专门负责处理已建立连接的I/O操作
// 默认创建的线程数量 = CPU 处理器数量 * 2用于处理业务逻辑
workerGroup = new NioEventLoopGroup();
// 2. 配置服务器启动参数
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
// 网络配置参数
.option(ChannelOption.SO_BACKLOG, 128)
// TCP连接建立超时时间5秒
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// 子通道配置(针对每个客户端连接)
// 启用TCP keepalive机制检测死连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitializer());
// 3. 绑定端口并启动服务器
ChannelFuture future = serverBootstrap.bind(port).sync();
// 保存服务器通道引用,用于后续关闭操作
serverChannel = future.channel();
// 4. 监听绑定结果并记录日志
future.addListener(f -> {
if (future.isSuccess()) {
log.info("webSocket服务启动成功端口{}", port);
} else {
log.error("webSocket服务启动失败端口{}", port);
}
});
// 5. 等待服务器关闭
// 这里会一直阻塞直到serverChannel被外部关闭
// 这就是为什么需要在异步线程中执行此方法的原因
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
// 如果线程被中断(比如应用关闭),记录日志并恢复中断状态
log.error("WebSocket服务启动过程中被中断", e);
Thread.currentThread().interrupt(); // 恢复中断状态
} catch (Exception e) {
// 捕获其他所有异常,记录日志并抛出运行时异常
log.error("WebSocket服务启动失败", e);
throw new RuntimeException("WebSocket服务启动失败", e);
} finally {
// 无论成功还是失败,都要清理资源
shutdownGracefully();
}
}
/**
* 优雅关闭Netty线程组资源
* 私有方法,用于在服务器启动异常时清理资源
*/
private void shutdownGracefully() {
// 优雅关闭接收连接的线程组
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
// 优雅关闭处理I/O的线程组
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
/**
* Spring容器销毁时自动调用此方法释放资源
* 使用@PreDestroy确保在应用关闭时优雅地关闭WebSocket服务
*/
@PreDestroy
public void destroy() throws InterruptedException {
log.info("正在关闭WebSocket服务...");
// 步骤1: 首先关闭服务器通道,停止接受新的连接请求
// 这样可以确保不会有新的客户端连接进来
if (serverChannel != null) {
try {
// 等待最多5秒让服务器通道关闭
serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS);
log.debug("服务器通道已关闭");
} catch (Exception e) {
log.warn("关闭服务器通道时发生异常", e);
}
}
// 步骤2: 关闭bossGroup线程组
// bossGroup负责接受连接现在可以安全关闭了
if (bossGroup != null) {
try {
// 优雅关闭静默期0秒超时时间5秒
// 静默期0秒意味着立即开始关闭超时5秒后强制关闭
bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
log.debug("bossGroup线程组已关闭");
} catch (InterruptedException e) {
log.warn("关闭bossGroup时被中断", e);
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
// 步骤3: 关闭workerGroup线程组
// workerGroup负责处理I/O需要等待现有连接处理完成
if (workerGroup != null) {
try {
// 等待现有任务完成但最多等待5秒
workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
log.debug("workerGroup线程组已关闭");
} catch (InterruptedException e) {
log.warn("关闭workerGroup时被中断", e);
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
// 步骤4: 取消异步启动任务(如果还在运行)
// 这可以避免在应用关闭后还有线程在后台运行
if (serverFuture != null && !serverFuture.isDone()) {
// true表示允许中断正在执行的任务
boolean cancelled = serverFuture.cancel(true);
if (cancelled) {
log.debug("异步启动任务已取消");
}
}
log.info("webSocket服务已销毁");
}
}