websocket优化调整

This commit is contained in:
2025-08-06 13:36:18 +08:00
parent a27eb7e462
commit ca1b1661d1
19 changed files with 1170 additions and 424 deletions

View File

@@ -15,6 +15,7 @@ import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import com.njcn.gather.detection.util.DetectionUtil;
import com.njcn.gather.detection.util.socket.*;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import com.njcn.gather.device.pojo.vo.PreDetection;
import com.njcn.gather.device.service.IPqDevService;
import com.njcn.gather.device.service.IPqStandardDevService;

View File

@@ -1,6 +1,5 @@
package com.njcn.gather.detection.handler;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
@@ -19,6 +18,7 @@ import com.njcn.gather.detection.pojo.vo.*;
import com.njcn.gather.detection.service.impl.DetectionServiceImpl;
import com.njcn.gather.detection.util.DetectionUtil;
import com.njcn.gather.detection.util.socket.*;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import com.njcn.gather.device.pojo.enums.CommonEnum;
import com.njcn.gather.device.pojo.po.PqDevSub;
import com.njcn.gather.device.pojo.vo.PreDetection;

View File

@@ -1,6 +1,5 @@
package com.njcn.gather.detection.handler;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
@@ -14,6 +13,7 @@ import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import com.njcn.gather.detection.util.socket.*;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import com.njcn.gather.device.pojo.vo.PreDetection;
import com.njcn.gather.device.service.IPqDevService;
import com.njcn.gather.script.pojo.po.SourceIssue;

View File

@@ -22,8 +22,9 @@ import com.njcn.gather.detection.util.business.DetectionCommunicateUtil;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
import com.njcn.gather.detection.util.socket.FormalTestManager;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.WebServiceManager;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.web.utils.RequestUtil;
import com.njcn.gather.detection.util.socket.cilent.NettyContrastClientHandler;
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
import com.njcn.gather.device.pojo.po.PqDev;
@@ -146,8 +147,28 @@ public class PreDetectionServiceImpl implements PreDetectionService {
}
/**
* 发送源通信校验Socket连接数字式和模拟式检测模式
*
* <p>该方法用于建立与程控源设备的Socket连接进行源通信校验。主要流程</p>
* <ul>
* <li>1. 存储检测参数到全局管理器</li>
* <li>2. 根据计划ID获取计划源信息</li>
* <li>3. 获取源设备初始化参数</li>
* <li>4. 初始化设备和源响应服务列表</li>
* <li>5. 组装Socket请求报文</li>
* <li>6. 建立Netty客户端连接</li>
* </ul>
*
* @param param 预检测参数包含计划ID、用户ID等信息
* @throws BusinessException 当计划源信息不存在或源初始化参数为空时抛出
*
* @see SourceOperateCodeEnum#YJC_YTXJY 源通信校验操作码
* @see SourceOperateCodeEnum#INIT_GATHER 初始化采集操作码
*/
private void sendYtxSocket(PreDetectionParam param) {
WebServiceManager.addPreDetectionParam(param);
String loginName = RequestUtil.getLoginNameByToken();
WebServiceManager.addPreDetectionParam(loginName, param);
AdPlanSource planSource = adPlanSourceService.getOne(new LambdaQueryWrapper<AdPlanSource>().eq(AdPlanSource::getPlanId, param.getPlanId()));
param.setSourceId(planSource.getSourceId());
if (ObjectUtil.isNotNull(planSource)) {
@@ -172,10 +193,28 @@ public class PreDetectionServiceImpl implements PreDetectionService {
}
}
/**
* 发送源通信校验Socket连接仿真模式
*
* <p>该方法专门用于仿真检测模式下的源通信校验。与普通模式的区别:</p>
* <ul>
* <li>直接使用传入的sourceId获取源初始化参数</li>
* <li>不需要通过计划ID查询计划源信息</li>
* <li>适用于独立的源设备通信测试</li>
* </ul>
*
* @param param 预检测参数必须包含sourceId和userPageId
* @throws BusinessException 当源初始化参数为空时抛出
*
* @see #sendYtxSocket(PreDetectionParam) 普通检测模式的源通信校验
* @see SourceOperateCodeEnum#YJC_YTXJY 源通信校验操作码
* @see SourceOperateCodeEnum#INIT_GATHER 初始化采集操作码
*/
private void sendYtxSocketSimulate(PreDetectionParam param) {
SourceInitialize sourceParam = pqSourceService.getSourceInitializeParam(param.getSourceId());
param.setSourceId(sourceParam.getSourceId());
WebServiceManager.addPreDetectionParam(param);
String loginName = RequestUtil.getLoginNameByToken();
WebServiceManager.addPreDetectionParam(loginName, param);
if (ObjectUtil.isNotNull(sourceParam)) {
SocketMsg<String> socketMsg = new SocketMsg<>();
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());

View File

@@ -6,6 +6,7 @@ import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
/**
* @Author: cdf

View File

@@ -1,86 +0,0 @@
package com.njcn.gather.detection.util.socket;
import com.alibaba.fastjson.JSON;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Description: webSocket存储的通道
* @Author: wr
* @Date: 2024/12/11 13:04
*/
@Slf4j
public class WebServiceManager {
//key:页面 value:channel
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
// 检测参数。key固定为preDetectionParam, value:检测参数
private static final Map<String, PreDetectionParam> preDetectionParamMap = new ConcurrentHashMap<>();
public static void addUser(String userId, Channel channel) {
userSessions.put(userId, channel);
}
public static void removeChannel(String channelId) {
// 遍历并删除
Iterator<Map.Entry<String, Channel>> iterator = userSessions.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Channel> entry = iterator.next();
if (entry.getValue().id().toString().equals(channelId)) {
iterator.remove();
}
}
}
public static Channel getChannelByUserId(String userId) {
return userSessions.get(userId);
}
public static void sendMsg(String userId,String msg) {
Channel channel = userSessions.get(userId);
if(Objects.nonNull(channel) && channel.isActive()){
TextWebSocketFrame wd = new TextWebSocketFrame(msg);
channel.writeAndFlush(wd);
}else {
log.error("{}-websocket推送消息失败;当前用户-{}-客户端已经断开连接", LocalDateTime.now(),userId);
// PreDetectionParam param = preDetectionParamMap.get("preDetectionParam");
// CnSocketUtil.quitSend(param);
// CnSocketUtil.quitSendSource(param);
}
}
public static void sendMessage(String userId, WebSocketVO<Object> webSocketVO) {
Channel channel = userSessions.get(userId);
if(Objects.nonNull(channel) && channel.isActive()){
TextWebSocketFrame wd = new TextWebSocketFrame(JSON.toJSONString(webSocketVO));
channel.writeAndFlush(wd);
}else {
log.error("{}-websocket推送消息失败;当前用户-{}-客户端已经断开连接", LocalDateTime.now(),userId);
}
}
public static void addPreDetectionParam(PreDetectionParam preDetectionParam) {
preDetectionParamMap.put("preDetectionParam", preDetectionParam);
}
public static PreDetectionParam getPreDetectionParam() {
return preDetectionParamMap.get("preDetectionParam");
}
public static void removePreDetectionParam() {
preDetectionParamMap.clear();
}
}

View File

@@ -8,7 +8,7 @@ import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.SocketDataMsg;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.WebServiceManager;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;

View File

@@ -10,7 +10,7 @@ import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
import com.njcn.gather.detection.util.socket.FormalTestManager;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.WebServiceManager;
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
import com.njcn.gather.device.pojo.vo.PreDetection;
import com.njcn.gather.script.pojo.po.SourceIssue;
import com.njcn.gather.system.pojo.enums.DicDataEnum;

View File

@@ -1,169 +0,0 @@
package com.njcn.gather.detection.util.socket.web;
import cn.hutool.core.util.ObjectUtil;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
import com.njcn.gather.detection.util.socket.WebServiceManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* @Description: 泛型 代表的是处理数据的单位
* TextWebSocketFrame : 文本信息帧
* @Author: wr
* @Date: 2024/12/10 13:56
*/
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private int times;
private final static String QUESTION_MARK = "?";
private final static String EQUAL_TO = "=";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("webSocket服务端通道已建立" + ctx.channel().id());
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次连接是FullHttpRequest把用户id和对应的channel对象存储起来
if (null != msg && msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
String userId = getUrlParams(uri);
WebServiceManager.addUser(userId, ctx.channel());
log.info("登录的用户id是{}", userId);
//如果url包含参数需要处理
if (uri.contains(QUESTION_MARK)) {
String newUri = uri.substring(0, uri.indexOf(QUESTION_MARK));
request.setUri(newUri);
}
} else if (msg instanceof TextWebSocketFrame) {
//正常的TEXT消息类型
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
//log.info("webSocket服务器收到客户端心跳信息{}", frame.text());
if ("alive".equals(frame.text())) {
//System.out.println("webSocket心跳收到时间………………………………………………………………"+LocalDateTime.now());
times = 0;
TextWebSocketFrame wd = new TextWebSocketFrame("over");
ctx.channel().writeAndFlush(wd);
}
}
super.channelRead(ctx, msg);
}
/**
* 根据用户地址获取用户名 ws://127.0.0.1:7777/hello?name=aa
*
* @param url
* @return
*/
private static String getUrlParams(String url) {
if (!url.contains(EQUAL_TO)) {
return null;
}
String userId = url.substring(url.indexOf(EQUAL_TO) + 1);
return userId;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
/* System.out.println("服务端消息 == " + msg.text());
if(msg.text().equals("下发指令")){
*//**
* 处理对应消息
* 1.先下发所要操作的流程信息
* 2.组装对应的入参信息
* 3.再用socket信息返回结束
*//*
//NettyClient.socketClient(msg.text(),new NettySourceClientHandler());
}*/
//可以直接调用text 拿到文本信息帧中的信息
/* Channel channel = ctx.channel();
TextWebSocketFrame resp = new TextWebSocketFrame(msg.text());
channel.writeAndFlush(resp);*/
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
//WebServiceManager.addUser(userId, ctx.channel());
System.out.println("webSocket有新的连接接入:" + ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
// 假设用户 ID 是从某个地方获取的,这里简单示例为 "userId"
System.out.println("weoSocket客户端退出: " + ctx.channel().id());
WebServiceManager.removeChannel(ctx.channel().id().toString());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("weoSocket断线" + ctx.channel().id());
ctx.close();
PreDetectionParam preDetectionParam = WebServiceManager.getPreDetectionParam();
if (ObjectUtil.isNotNull(preDetectionParam)) {
CnSocketUtil.quitSendSource(preDetectionParam); // 能否在这里关闭源socket连接
CnSocketUtil.quitSend(preDetectionParam);
} else {
preDetectionParam = new PreDetectionParam();
preDetectionParam.setUserPageId("cdf");
CnSocketUtil.quitSend(preDetectionParam);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
IdleStateEvent event = (IdleStateEvent) evt;
String eventDesc = null;
switch (event.state()) {
case READER_IDLE:
eventDesc = "读空闲";
System.out.println("c端心跳检测发生超时事件--" + eventDesc);
times++;
if (times > 3) {
System.out.println("c端心跳检测空闲次数超过三次 关闭连接");
ctx.channel().close();
WebServiceManager.removeChannel(ctx.channel().id().toString());
}
break;
case WRITER_IDLE:
eventDesc = "写空闲";
break;
case ALL_IDLE:
eventDesc = "读写空闲";
break;
}
//super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@@ -1,63 +0,0 @@
package com.njcn.gather.detection.util.socket.web;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* @Description: webSocket服务端自定义配置
* @Author: wr
* @Date: 2024/12/10 14:20
*/
public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//设置心跳机制
// ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
//增加编解码器 的另一种方式
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpResponseDecoder());
//块方式写的处理器 适合处理大数据
pipeline.addLast(new ChunkedWriteHandler());
//聚合
pipeline.addLast(new HttpObjectAggregator(512 * 1024));
/*
* 这个时候 我们需要声明我们使用的是 websocket 协议
* netty为websocket也准备了对应处理器 设置的是访问路径
* 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
* 这个handler是将http协议升级为websocket 并且使用 101 作为响应码
* */
pipeline.addLast(new IdleStateHandler(13, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new WebSocketHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 处理异常,例如记录日志、关闭连接等
System.out.println("进入异常++++++++++++++++++");
cause.printStackTrace();
ctx.close();
}
});
}
}

View File

@@ -1,94 +0,0 @@
package com.njcn.gather.detection.util.socket.web;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @Description: websocket服务端
* @Author: wr
* @Date: 2024/12/10 13:59
*/
@Component
@RequiredArgsConstructor
public class WebSocketService {
/**
* 端口号
*/
@Value("${webSocket.port:7777}")
int port;
EventLoopGroup bossGroup;
EventLoopGroup workerGroup;
@PostConstruct
public void start() {
new Thread(() -> {
//可以自定义线程的数量
bossGroup = new NioEventLoopGroup();
// 默认创建的线程数量 = CPU 处理器数量 *2
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
//当前连接被阻塞的时候BACKLOG代表的事 阻塞队列的长度
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
//设置连接保持为活动状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitializer());
ChannelFuture future = serverBootstrap.bind(port).sync();
future.addListener(f -> {
if (future.isSuccess()) {
System.out.println("webSocket服务启动成功");
} else {
System.out.println("webSocket服务启动失败");
}
});
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}).start();
}
/**
* 释放资源
*/
@PreDestroy
public void destroy() throws InterruptedException {
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully().sync();
}
System.out.println("webSocket销毁---------------");
}
}

View File

@@ -0,0 +1,261 @@
package com.njcn.gather.detection.util.socket.websocket;
import com.alibaba.fastjson.JSON;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocket会话管理器
*
* <p>负责管理电能质量检测系统中的WebSocket连接会话和检测参数主要功能包括</p>
* <ul>
* <li>WebSocket连接会话的添加、删除和管理</li>
* <li>向指定用户推送实时消息(文本消息和结构化消息)</li>
* <li>全局检测参数的存储和管理</li>
* </ul>
*
* <p><b>线程安全性:</b></p>
* 使用ConcurrentHashMap确保在高并发环境下的线程安全。
*
* <p><b>使用场景:</b></p>
* <ul>
* <li>检测进度实时推送</li>
* <li>检测结果数据推送</li>
* <li>设备状态变更通知</li>
* <li>异常信息推送</li>
* </ul>
*
* <p><b>消息推送方式:</b></p>
* <ul>
* <li>{@link #sendMsg(String, String)} - 发送纯文本消息</li>
* <li>{@link #sendMessage(String, WebSocketVO)} - 发送结构化JSON消息</li>
* </ul>
*
* @author wr
* @date 2024/12/11 13:04
* @version 1.0
* @since 检测系统 v2.3.12
*
* @see com.njcn.gather.detection.util.socket.websocket.WebSocketHandler WebSocket处理器
* @see com.njcn.gather.detection.pojo.vo.WebSocketVO WebSocket消息对象
*/
@Slf4j
public class WebServiceManager {
/**
* WebSocket用户会话存储
* key: 用户ID, value: WebSocket连接通道
*/
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
/**
* 检测参数存储
* key: 用户ID(userPageId), value: 检测参数对象
* 支持多用户并发检测,每个用户的检测参数独立存储
*/
private static final Map<String, PreDetectionParam> preDetectionParamMap = new ConcurrentHashMap<>();
/**
* 添加用户WebSocket会话
*
* @param userId 用户ID不能为null
* @param channel WebSocket连接通道不能为null
*/
public static void addUser(String userId, Channel channel) {
userSessions.put(userId, channel);
}
/**
* 根据用户ID移除会话推荐使用
* 时间复杂度O(1)
*
* @param userId 用户ID
* @return 被移除的Channel如果不存在则返回null
*/
public static Channel removeByUserId(String userId) {
return userSessions.remove(userId);
}
/**
* 根据channelId移除会话兼容老版本
* 时间复杂度O(n)建议使用removeByUserId替代
*
* @param channelId 通道ID
* @deprecated 建议使用 {@link #removeByUserId(String)} 替代
*/
@Deprecated
public static void removeChannel(String channelId) {
// 遍历并删除
Iterator<Map.Entry<String, Channel>> iterator = userSessions.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Channel> entry = iterator.next();
if (entry.getValue().id().toString().equals(channelId)) {
iterator.remove();
break; // 找到后立即退出,避免继续遍历
}
}
}
/**
* 发送纯文本消息给指定用户
*
* @param userId 目标用户ID
* @param msg 要发送的文本消息
*/
public static void sendMsg(String userId, String msg) {
Channel channel = userSessions.get(userId);
if (Objects.nonNull(channel) && channel.isActive()) {
TextWebSocketFrame frame = new TextWebSocketFrame(msg);
channel.writeAndFlush(frame);
} else {
log.error("WebSocket推送消息失败用户连接已断开时间: {}, userId: {}", LocalDateTime.now(), userId);
}
}
/**
* 发送结构化消息给指定用户
* 消息会被序列化为JSON格式后发送
*
* @param userId 目标用户ID
* @param webSocketVO 要发送的结构化消息对象
*/
public static void sendMessage(String userId, WebSocketVO<Object> webSocketVO) {
Channel channel = userSessions.get(userId);
if (Objects.nonNull(channel) && channel.isActive()) {
TextWebSocketFrame frame = new TextWebSocketFrame(JSON.toJSONString(webSocketVO));
channel.writeAndFlush(frame);
} else {
log.error("WebSocket推送结构化消息失败用户连接已断开时间: {}, userId: {}", LocalDateTime.now(), userId);
}
}
/**
* 存储检测参数基于用户ID
* 支持多用户并发检测,每个用户的检测参数独立存储
*
* @param userId 用户ID登录名
* @param preDetectionParam 检测参数对象
* @throws IllegalArgumentException 当userId或检测参数为空时抛出
*/
public static void addPreDetectionParam(String userId, PreDetectionParam preDetectionParam) {
if (userId == null || userId.trim().isEmpty()) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (preDetectionParam == null) {
throw new IllegalArgumentException("检测参数不能为空");
}
preDetectionParamMap.put(userId, preDetectionParam);
}
/**
* 存储检测参数(兼容老版本)
* 从检测参数对象中获取userPageId作为key
*
* @param preDetectionParam 检测参数对象必须包含userPageId
* @throws IllegalArgumentException 当userPageId为空时抛出
* @deprecated 建议使用 {@link #addPreDetectionParam(String, PreDetectionParam)}
*/
@Deprecated
public static void addPreDetectionParam(PreDetectionParam preDetectionParam) {
if (preDetectionParam == null || preDetectionParam.getUserPageId() == null) {
throw new IllegalArgumentException("检测参数或用户ID不能为空");
}
preDetectionParamMap.put(preDetectionParam.getUserPageId(), preDetectionParam);
}
/**
* 获取指定用户的检测参数
*
* @param userId 用户ID
* @return 检测参数对象如果不存在则返回null
*/
public static PreDetectionParam getPreDetectionParam(String userId) {
return preDetectionParamMap.get(userId);
}
/**
* 获取当前检测参数(兼容老版本)
* 注意:该方法已废弃,建议使用 {@link #getPreDetectionParam(String)}
*
* @return 检测参数对象如果不存在则返回null
* @deprecated 多用户并发场景下该方法不安全,请使用 {@link #getPreDetectionParam(String)}
*/
@Deprecated
public static PreDetectionParam getPreDetectionParam() {
if (preDetectionParamMap.size() == 1) {
return preDetectionParamMap.values().iterator().next();
}
log.warn("存在多个检测参数,无法确定返回哪个,当前参数数量: {}", preDetectionParamMap.size());
return null;
}
/**
* 移除指定用户的检测参数
*
* @param userId 用户ID
* @return 被移除的检测参数如果不存在则返回null
*/
public static PreDetectionParam removePreDetectionParam(String userId) {
return preDetectionParamMap.remove(userId);
}
/**
* 清空所有检测参数
*/
public static void removeAllPreDetectionParam() {
preDetectionParamMap.clear();
}
/**
* 清空所有检测参数(兼容老版本)
*
* @deprecated 建议使用 {@link #removeAllPreDetectionParam()} 或 {@link #removePreDetectionParam(String)}
*/
@Deprecated
public static void removePreDetectionParam() {
removeAllPreDetectionParam();
}
// ================================ 实用功能方法 ================================
/**
* 获取当前在线用户数量
*
* @return 在线用户数量
*/
public static int getOnlineUserCount() {
return userSessions.size();
}
/**
* 检查指定用户是否在线
*
* @param userId 用户ID
* @return true如果用户在线且连接活跃否则返回false
*/
public static boolean isUserOnline(String userId) {
Channel channel = userSessions.get(userId);
return Objects.nonNull(channel) && channel.isActive();
}
/**
* 获取所有在线用户ID集合
*
* @return 在线用户ID集合的快照
*/
public static java.util.Set<String> getOnlineUserIds() {
return new java.util.HashSet<>(userSessions.keySet());
}
}

View File

@@ -0,0 +1,49 @@
package com.njcn.gather.detection.util.socket.websocket;
/**
* WebSocket常量管理类
*
* @author wr
* @date 2024/12/10
*/
public final class WebSocketConstants {
/**
* URL参数分隔符
*/
public static final String QUESTION_MARK = "?";
/**
* URL参数等号分隔符
*/
public static final String EQUAL_TO = "=";
/**
* 客户端心跳消息
*/
public static final String HEARTBEAT_PING = "alive";
/**
* 服务端心跳响应
*/
public static final String HEARTBEAT_PONG = "over";
/**
* 心跳超时最大次数
*/
public static final int MAX_HEARTBEAT_MISS_COUNT = 3;
/**
* WebSocket握手失败状态码
*/
public static final int HANDSHAKE_FAILED_STATUS = 4000;
/**
* WebSocket握手失败原因
*/
public static final String HANDSHAKE_FAILED_REASON = "Missing required userId parameter";
private WebSocketConstants() {
// 私有构造函数,防止实例化
}
}

View File

@@ -0,0 +1,389 @@
package com.njcn.gather.detection.util.socket.websocket;
import cn.hutool.core.util.ObjectUtil;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.util.socket.CnSocketUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.njcn.gather.detection.util.socket.websocket.WebSocketConstants.*;
/**
* WebSocket消息处理器
*
* <p>负责处理电能质量检测系统中的WebSocket连接和消息通信主要功能包括</p>
* <ul>
* <li>WebSocket连接的建立、维护和断开</li>
* <li>用户身份验证和会话管理</li>
* <li>心跳检测和连接保活</li>
* <li>检测状态和结果的实时推送</li>
* <li>异常处理和资源清理</li>
* </ul>
*
* <p><b>通信协议:</b></p>
* <pre>
* 连接URL: ws://host:port/path?name=userId
* 心跳消息: "alive" -> "over"
* 业务消息: JSON格式的检测数据和状态信息
* </pre>
*
* <p><b>安全策略:</b></p>
* <ul>
* <li>连接时必须提供有效的userId参数否则拒绝连接</li>
* <li>支持心跳超时检测超时3次自动断开连接</li>
* <li>连接断开时自动清理相关Socket资源</li>
* </ul>
*
* <p><b>使用场景:</b></p>
* 主要用于前端实时接收检测进度、检测结果、设备状态等信息的推送,
* 配合detection模块的ResponseService类实现完整的实时通信链路。
*
* @author wr
* @date 2024/12/10 13:56
* @version 1.0
* @since 检测系统 v2.3.12
*
* @see WebServiceManager 会话管理器
* @see WebSocketConstants 常量定义
* @see com.njcn.gather.detection.handler.SocketDevResponseService 设备响应处理
* @see com.njcn.gather.detection.handler.SocketSourceResponseService 源响应处理
*/
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// ================================ 字段定义 ================================
/**
* 心跳超时计数器
*/
private int times;
/**
* 当前WebSocket连接对应的用户ID
* 在首次HTTP握手时从URL参数中提取并存储
* 用于后续的Socket连接管理和资源清理
*/
private String userId;
/**
* 心跳响应内容常量
* 注意不能预创建TextWebSocketFrame对象因为ByteBuf状态会改变
*/
private static final String HEARTBEAT_RESPONSE_TEXT = HEARTBEAT_PONG;
// ================================ Netty生命周期方法 ================================
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("webSocket服务端通道已建立channelId: {}", ctx.channel().id());
super.channelActive(ctx);
}
// HTTP握手处理已移至WebSocketPreprocessor这里只处理WebSocket帧
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
handleWebSocketMessage(ctx, msg);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("webSocket有新的连接接入channelId: {}", ctx.channel().id());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("webSocket客户端退出channelId: {}, userId: {}", ctx.channel().id(), this.userId);
if (this.userId != null) {
WebServiceManager.removeByUserId(this.userId);
} else {
// 备用方案如果userId为空使用传统方法
WebServiceManager.removeChannel(ctx.channel().id().toString());
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("webSocket连接断线channelId: {}, userId: {}", ctx.channel().id(), this.userId);
// 确保通道关闭
if (ctx.channel() != null && ctx.channel().isActive()) {
ctx.close();
}
// 使用Handler实例中保存的userId进行资源清理
cleanupSocketResources(this.userId);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 处理WebSocket握手完成事件
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete =
(WebSocketServerProtocolHandler.HandshakeComplete) evt;
// 从Channel属性获取userId由WebSocketPreprocessor设置
this.userId = ctx.channel().attr(AttributeKey.<String>valueOf("userId")).get();
log.info("WebSocket协议升级完成userId: {}, channelId: {}, requestUri: {}",
this.userId, ctx.channel().id(), handshakeComplete.requestUri());
// 握手完成后建立用户会话
if (this.userId != null) {
WebServiceManager.addUser(this.userId, ctx.channel());
log.info("WebSocket用户会话已建立userId: {}, channelId: {}", this.userId, ctx.channel().id());
}
// 发送连接成功消息给前端
sendConnectionSuccessMessage(ctx);
return;
}
// 处理心跳超时事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String eventDesc;
switch (event.state()) {
case READER_IDLE:
eventDesc = "读空闲";
log.warn("客户端心跳检测发生超时事件: {}channelId: {}", eventDesc, ctx.channel().id());
times++;
if (times > MAX_HEARTBEAT_MISS_COUNT) {
log.error("客户端心跳检测空闲次数超过{}次关闭连接channelId: {}, userId: {}", MAX_HEARTBEAT_MISS_COUNT, ctx.channel().id(), this.userId);
ctx.channel().close();
if (this.userId != null) {
WebServiceManager.removeByUserId(this.userId);
} else {
WebServiceManager.removeChannel(ctx.channel().id().toString());
}
}
break;
case WRITER_IDLE:
log.debug("webSocket写空闲事件channelId: {}", ctx.channel().id());
break;
case ALL_IDLE:
log.debug("webSocket读写空闲事件channelId: {}", ctx.channel().id());
break;
default:
break;
}
return;
}
// 其他事件传递给父类处理
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
String channelId = ctx.channel().id().toString();
try {
// 1. 异常分类记录
logExceptionByType(channelId, cause);
// 2. 业务清理
cleanupOnException(ctx, cause);
// 3. 连接处理决策
handleConnectionByExceptionType(ctx, cause);
} catch (Exception e) {
// 防止异常处理本身出错
log.error("异常处理过程中发生错误强制关闭连接channelId: {}", channelId, e);
ctx.close();
}
}
// ================================ HTTP握手处理已移至WebSocketPreprocessor ================================
// ================================ WebSocket消息处理 ================================
/**
* 发送连接成功消息给前端
* WebSocket握手完成后立即调用通知前端连接建立成功
*
* @param ctx Netty通道上下文
*/
private void sendConnectionSuccessMessage(ChannelHandlerContext ctx) {
if (ctx == null || ctx.channel() == null || !ctx.channel().isActive()) {
log.warn("无法发送连接成功消息:通道不可用, userId: {}", this.userId);
return;
}
try {
// 构建连接成功消息
String welcomeMessage = String.format("{\"type\":\"connection\",\"status\":\"success\",\"message\":\"WebSocket连接建立成功\",\"userId\":\"%s\",\"timestamp\":%d}",
this.userId, System.currentTimeMillis());
TextWebSocketFrame frame = new TextWebSocketFrame(welcomeMessage);
ctx.channel().writeAndFlush(frame);
log.info("已发送连接成功消息给前端, userId: {}, channelId: {}", this.userId, ctx.channel().id());
} catch (Exception e) {
log.error("发送连接成功消息失败, userId: {}, channelId: {}", this.userId, ctx.channel().id(), e);
}
}
/**
* 处理WebSocket文本消息
* 这里是所有WebSocket文本消息的统一处理入口
*
* @param ctx Netty通道上下文
* @param frame WebSocket文本帧
*/
private void handleWebSocketMessage(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
String messageText = frame.text();
// 处理心跳消息
if (HEARTBEAT_PING.equals(messageText)) {
handleHeartbeat(ctx);
} else {
// 处理业务消息
handleBusinessMessage(ctx, frame, messageText);
}
}
/**
* 处理心跳消息
* 重置超时计数器并回复心跳响应
*
* @param ctx Netty通道上下文
*/
private void handleHeartbeat(ChannelHandlerContext ctx) {
if (ctx == null || ctx.channel() == null || !ctx.channel().isActive()) {
log.warn("心跳处理失败通道不可用userId: {}", this.userId);
return;
}
log.debug("收到心跳消息userId: {}, channelId: {}", this.userId, ctx.channel().id());
// 重置心跳超时计数器
times = 0;
// 每次创建新的心跳响应帧,确保内容正确
TextWebSocketFrame heartbeatFrame = new TextWebSocketFrame(HEARTBEAT_RESPONSE_TEXT);
ctx.channel().writeAndFlush(heartbeatFrame);
log.debug("发送心跳响应userId: {}, channelId: {}", this.userId, ctx.channel().id());
}
/**
* 处理业务消息
* 可以在这里扩展具体的业务逻辑处理
*
* @param ctx Netty通道上下文
* @param frame WebSocket文本帧
* @param messageText 消息文本内容
*/
private void handleBusinessMessage(ChannelHandlerContext ctx, TextWebSocketFrame frame, String messageText) {
log.debug("收到WebSocket业务消息userId: {}, channelId: {}, message: {}",
this.userId, ctx.channel().id(), messageText);
// TODO: 根据业务需要扩展消息处理逻辑
// 例如:
// - 解析JSON消息
// - 根据消息类型分发到不同的处理器
// - 调用业务服务处理具体逻辑
}
// ================================ 异常处理 ================================
/**
* 根据异常类型记录不同级别的日志
*/
private void logExceptionByType(String channelId, Throwable cause) {
if (cause instanceof IOException) {
log.info("webSocket网络异常客户端可能异常断开channelId: {}, 异常: {}", channelId, cause.getMessage());
} else if (cause instanceof WebSocketHandshakeException) {
log.warn("webSocket握手异常channelId: {}, 异常: {}", channelId, cause.getMessage());
} else if (cause instanceof DecoderException || cause instanceof CorruptedFrameException) {
log.error("webSocket协议解码异常可能是恶意请求channelId: {}, 异常: {}", channelId, cause.getMessage(), cause);
} else if (cause instanceof IllegalArgumentException) {
log.warn("webSocket参数异常channelId: {}, 异常: {}", channelId, cause.getMessage());
} else {
log.error("webSocket未分类异常channelId: {}, 类型: {}, 异常: {}",
channelId, cause.getClass().getSimpleName(), cause.getMessage(), cause);
}
}
/**
* 异常发生时的业务清理工作
*/
private void cleanupOnException(ChannelHandlerContext ctx, Throwable cause) {
if (ctx == null || ctx.channel() == null) {
log.warn("异常处理:通道上下文为空,无法进行清理");
return;
}
String channelId = ctx.channel().id().toString();
// 清理会话
if (this.userId != null) {
WebServiceManager.removeByUserId(this.userId);
log.debug("已清理WebSocket会话userId: {}, channelId: {}", this.userId, channelId);
} else {
WebServiceManager.removeChannel(channelId);
log.debug("已清理WebSocket会话使用channelIdchannelId: {}", channelId);
}
// 清理检测相关资源
cleanupSocketResources(this.userId);
}
/**
* 根据异常类型决定连接处理策略
*/
private void handleConnectionByExceptionType(ChannelHandlerContext ctx, Throwable cause) {
String channelId = ctx.channel().id().toString();
// URL参数异常但连接本身可能正常尝试保持连接
if (cause instanceof IllegalArgumentException &&
cause.getMessage() != null && cause.getMessage().contains("URL")) {
log.info("URL参数异常尝试保持连接channelId: {}", channelId);
return;
}
// 其他情况都关闭连接
log.debug("关闭WebSocket连接channelId: {}", channelId);
ctx.close();
}
// ================================ 资源清理 ================================
/**
* 清理Socket相关资源
*
* @param userId 用户ID
*/
private void cleanupSocketResources(String userId) {
if (userId == null || userId.trim().isEmpty()) {
log.warn("userId为空无法进行Socket连接清理");
return;
}
try {
PreDetectionParam preDetectionParam = WebServiceManager.getPreDetectionParam(userId);
if (ObjectUtil.isNotNull(preDetectionParam)) {
// 使用该用户的检测参数关闭Socket连接
log.info("使用用户检测参数关闭Socket连接userId: {}", userId);
CnSocketUtil.quitSendSource(preDetectionParam);
CnSocketUtil.quitSend(preDetectionParam);
// 清理完成后移除该用户的检测参数
WebServiceManager.removePreDetectionParam(userId);
} else {
// 使用当前Handler的userId创建检测参数进行清理
log.info("未找到用户检测参数使用默认参数关闭Socket连接userId: {}", userId);
preDetectionParam = new PreDetectionParam();
preDetectionParam.setUserPageId(userId);
// 只关闭设备Socket因为没有sourceId等其他参数
CnSocketUtil.quitSend(preDetectionParam);
}
} catch (Exception e) {
log.error("清理Socket连接时发生异常userId: {}", userId, e);
}
}
// ================================ URL解析工具已移至WebSocketPreprocessor ================================
}

View File

@@ -0,0 +1,184 @@
package com.njcn.gather.detection.util.socket.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* WebSocket服务端管道初始化器
*
* 职责:
* 1. 为每个新的WebSocket连接配置处理器链Pipeline
* 2. 按正确顺序添加各种Handler确保数据流正确处理
* 3. 配置HTTP到WebSocket的协议升级
* 4. 设置心跳检测和异常处理机制
*
* 处理流程:
* HTTP请求 → HTTP编解码 → 分块处理 → 消息聚合 → 协议升级 → 心跳检测 → 业务处理 → 异常处理
*
* @Description: webSocket服务端自定义配置
* @Author: wr
* @Date: 2024/12/10 14:20
*/
@Slf4j
public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
/**
* WebSocket访问路径
*/
private static final String WEBSOCKET_PATH = "/hello";
/**
* HTTP消息最大聚合大小512KB
* 用于WebSocket握手和消息传输
*/
private static final int MAX_CONTENT_LENGTH = 512 * 1024;
/**
* 心跳检测间隔13秒
* 13秒内没有收到客户端消息则触发空闲事件
*/
private static final int READER_IDLE_TIME_SECONDS = 13;
/**
* 为每个新连接初始化处理器管道
* 注意Handler的添加顺序非常重要决定了数据的处理流向
*
* @param ch 新建立的Socket通道
* @throws Exception 初始化过程中的异常
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 1. HTTP协议处理器
// HttpServerCodec = HttpRequestDecoder + HttpResponseEncoder
// 负责HTTP请求解码和HTTP响应编码
pipeline.addLast("http-codec", new HttpServerCodec());
// 2. 分块写入处理器
// 用于处理大文件的分块传输,防止内存溢出
// 支持ChunkedInput如ChunkedFile、ChunkedNioFile等
pipeline.addLast("chunked-write", new ChunkedWriteHandler());
// 3. HTTP消息聚合器
// 将分片的HTTP消息重新组装成完整的FullHttpRequest或FullHttpResponse
// WebSocket握手需要完整的HTTP请求所以这个Handler必须添加
pipeline.addLast("http-aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
// 4. WebSocket URL预处理器
// 在WebSocket握手之前处理URL参数验证用户ID
pipeline.addLast("websocket-preprocessor", new WebSocketPreprocessor());
// 5. WebSocket协议升级处理器
// 处理WebSocket握手将HTTP协议升级为WebSocket协议
// 只有访问指定路径(WEBSOCKET_PATH)的请求才会被升级
// 升级后会移除HTTP相关的Handler添加WebSocket相关的Handler
pipeline.addLast("websocket-protocol", new WebSocketServerProtocolHandler(WEBSOCKET_PATH));
// 6. 空闲状态检测器
// 检测连接的空闲状态,用于心跳机制
// readerIdleTime: 读空闲时间writerIdleTime: 写空闲时间allIdleTime: 读写空闲时间
pipeline.addLast("idle-state", new IdleStateHandler(READER_IDLE_TIME_SECONDS, 0, 0, TimeUnit.SECONDS));
// 7. 自定义WebSocket业务处理器
// 处理WebSocket帧实现具体的业务逻辑
// 包括心跳处理、消息路由、连接管理等
pipeline.addLast("websocket-handler", new WebSocketHandler());
// 7. 全局异常处理器
// 处理整个管道中未被捕获的异常,作为最后的异常处理兜底
pipeline.addLast("exception-handler", new GlobalExceptionHandler());
}
/**
* WebSocket预处理器
* 在WebSocket握手之前验证URL参数并清理URL
*/
private static class WebSocketPreprocessor extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
log.debug("WebSocket预处理器收到HTTP请求{}", uri);
// 验证并提取userId
String userId = extractUserId(uri);
if (userId == null || userId.trim().isEmpty()) {
log.warn("WebSocket连接被拒绝缺少userId参数, uri: {}", uri);
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.BAD_REQUEST
);
ctx.writeAndFlush(response).addListener(f -> ctx.close());
return;
}
// 将userId存储到Channel属性中
ctx.channel().attr(AttributeKey.<String>valueOf("userId")).set(userId);
// 清理URL参数
if (uri.contains("?")) {
String cleanUri = uri.substring(0, uri.indexOf("?"));
request.setUri(cleanUri);
log.debug("URL已清理原始: {}, 清理后: {}, userId: {}", uri, cleanUri, userId);
}
}
// 继续传递给下一个Handler
super.channelRead(ctx, msg);
}
private String extractUserId(String uri) {
if (!uri.contains("name=")) {
return null;
}
int start = uri.indexOf("name=") + 5;
int end = uri.indexOf("&", start);
if (end == -1) {
return uri.substring(start);
} else {
return uri.substring(start, end);
}
}
}
/**
* 全局异常处理器
* 作为管道中的最后一个Handler捕获所有未处理的异常
*/
private static class GlobalExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 记录异常详情,便于问题排查
log.error("WebSocket连接发生未处理异常远程地址{},异常信息:{}",
ctx.channel().remoteAddress(), cause.getMessage(), cause);
// 优雅关闭连接
if (ctx.channel().isActive()) {
ctx.close();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("WebSocket连接断开远程地址{}", ctx.channel().remoteAddress());
super.channelInactive(ctx);
}
}
}

View File

@@ -0,0 +1,237 @@
package com.njcn.gather.detection.util.socket.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* WebSocket服务端核心类
*
* 职责:
* 1. 启动基于Netty的WebSocket服务器
* 2. 管理服务器生命周期(启动/关闭)
* 3. 提供高性能的WebSocket通信支持
*
* 特性:
* - 使用ApplicationRunner确保在Spring容器完全启动后再启动WebSocket服务
* - 使用CompletableFuture异步启动避免阻塞Spring Boot主线程
* - 支持优雅关闭,确保资源正确释放
* - 完善的异常处理和日志记录
*
* @Description: websocket服务端
* @Author: wr
* @Date: 2024/12/10 13:59
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketService implements ApplicationRunner {
/**
* WebSocket服务器监听端口
* 默认7777端口可通过配置文件webSocket.port自定义
* 客户端连接地址ws://host:port/hello?name=userId
*/
@Value("${webSocket.port:7777}")
int port;
/**
* Netty Boss线程组
* 专门负责接受新的客户端连接请求
* 通常配置1个线程即可因为接受连接的操作相对简单
*/
EventLoopGroup bossGroup;
/**
* Netty Worker线程组
* 专门负责处理已建立连接的I/O操作和业务逻辑
* 默认线程数 = CPU核心数 * 2用于并发处理多个客户端
*/
EventLoopGroup workerGroup;
/**
* 服务器通道引用
* 保存绑定端口后的Channel用于服务器关闭时释放资源
*/
private Channel serverChannel;
/**
* 异步启动任务的Future对象
* 用于管理WebSocket服务器的异步启动过程
* 可以用来取消启动任务或检查启动状态
*/
private CompletableFuture<Void> serverFuture;
/**
* Spring Boot应用启动完成后自动调用此方法
* 使用ApplicationRunner确保在所有Bean初始化完成后再启动WebSocket服务
*/
@Override
public void run(ApplicationArguments args){
// 使用CompletableFuture异步启动WebSocket服务避免阻塞Spring Boot主线程
// 这样可以让应用快速启动完成WebSocket服务在后台异步启动
serverFuture = CompletableFuture.runAsync(this::startWebSocketServer)
.exceptionally(throwable -> {
// 如果启动过程中发生异常,记录日志但不影响应用启动
log.error("WebSocket服务启动异常", throwable);
return null;
});
}
/**
* 启动WebSocket服务器的核心方法
* 此方法会一直阻塞直到服务器关闭,所以需要在异步线程中执行
*/
private void startWebSocketServer() {
try {
// 1. 创建线程组
// bossGroup: 专门负责接受新的客户端连接请求
// 可以自定义线程的数量这里使用默认值通常为1个线程
bossGroup = new NioEventLoopGroup(1);
// workerGroup: 专门负责处理已建立连接的I/O操作
// 默认创建的线程数量 = CPU 处理器数量 * 2用于处理业务逻辑
workerGroup = new NioEventLoopGroup();
// 2. 配置服务器启动参数
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
// 网络配置参数
.option(ChannelOption.SO_BACKLOG, 128)
// TCP连接建立超时时间5秒
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// 子通道配置(针对每个客户端连接)
// 启用TCP keepalive机制检测死连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitializer());
// 3. 绑定端口并启动服务器
ChannelFuture future = serverBootstrap.bind(port).sync();
// 保存服务器通道引用,用于后续关闭操作
serverChannel = future.channel();
// 4. 监听绑定结果并记录日志
future.addListener(f -> {
if (future.isSuccess()) {
log.info("webSocket服务启动成功端口{}", port);
} else {
log.error("webSocket服务启动失败端口{}", port);
}
});
// 5. 等待服务器关闭
// 这里会一直阻塞直到serverChannel被外部关闭
// 这就是为什么需要在异步线程中执行此方法的原因
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
// 如果线程被中断(比如应用关闭),记录日志并恢复中断状态
log.error("WebSocket服务启动过程中被中断", e);
Thread.currentThread().interrupt(); // 恢复中断状态
} catch (Exception e) {
// 捕获其他所有异常,记录日志并抛出运行时异常
log.error("WebSocket服务启动失败", e);
throw new RuntimeException("WebSocket服务启动失败", e);
} finally {
// 无论成功还是失败,都要清理资源
shutdownGracefully();
}
}
/**
* 优雅关闭Netty线程组资源
* 私有方法,用于在服务器启动异常时清理资源
*/
private void shutdownGracefully() {
// 优雅关闭接收连接的线程组
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
// 优雅关闭处理I/O的线程组
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
/**
* Spring容器销毁时自动调用此方法释放资源
* 使用@PreDestroy确保在应用关闭时优雅地关闭WebSocket服务
*/
@PreDestroy
public void destroy() throws InterruptedException {
log.info("正在关闭WebSocket服务...");
// 步骤1: 首先关闭服务器通道,停止接受新的连接请求
// 这样可以确保不会有新的客户端连接进来
if (serverChannel != null) {
try {
// 等待最多5秒让服务器通道关闭
serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS);
log.debug("服务器通道已关闭");
} catch (Exception e) {
log.warn("关闭服务器通道时发生异常", e);
}
}
// 步骤2: 关闭bossGroup线程组
// bossGroup负责接受连接现在可以安全关闭了
if (bossGroup != null) {
try {
// 优雅关闭静默期0秒超时时间5秒
// 静默期0秒意味着立即开始关闭超时5秒后强制关闭
bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
log.debug("bossGroup线程组已关闭");
} catch (InterruptedException e) {
log.warn("关闭bossGroup时被中断", e);
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
// 步骤3: 关闭workerGroup线程组
// workerGroup负责处理I/O需要等待现有连接处理完成
if (workerGroup != null) {
try {
// 等待现有任务完成但最多等待5秒
workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync();
log.debug("workerGroup线程组已关闭");
} catch (InterruptedException e) {
log.warn("关闭workerGroup时被中断", e);
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
// 步骤4: 取消异步启动任务(如果还在运行)
// 这可以避免在应用关闭后还有线程在后台运行
if (serverFuture != null && !serverFuture.isDone()) {
// true表示允许中断正在执行的任务
boolean cancelled = serverFuture.cancel(true);
if (cancelled) {
log.debug("异步启动任务已取消");
}
}
log.info("webSocket服务已销毁");
}
}