diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketContrastResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketContrastResponseService.java index 834da0b0..4f0c960f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketContrastResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketContrastResponseService.java @@ -15,6 +15,7 @@ import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.pojo.vo.WebSocketVO; import com.njcn.gather.detection.util.DetectionUtil; import com.njcn.gather.detection.util.socket.*; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; import com.njcn.gather.device.pojo.vo.PreDetection; import com.njcn.gather.device.service.IPqDevService; import com.njcn.gather.device.service.IPqStandardDevService; diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index d47eee53..2f05bd39 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -1,6 +1,5 @@ package com.njcn.gather.detection.handler; -import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.ObjectUtil; @@ -19,6 +18,7 @@ import com.njcn.gather.detection.pojo.vo.*; import com.njcn.gather.detection.service.impl.DetectionServiceImpl; import com.njcn.gather.detection.util.DetectionUtil; import com.njcn.gather.detection.util.socket.*; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; import com.njcn.gather.device.pojo.enums.CommonEnum; import com.njcn.gather.device.pojo.po.PqDevSub; import com.njcn.gather.device.pojo.vo.PreDetection; diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java index d5e6cce4..37220161 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java @@ -1,6 +1,5 @@ package com.njcn.gather.detection.handler; -import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; @@ -14,6 +13,7 @@ import com.njcn.gather.detection.pojo.vo.WebSocketVO; import com.njcn.gather.detection.util.socket.*; import com.njcn.gather.detection.util.socket.cilent.NettyClient; import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; import com.njcn.gather.device.pojo.vo.PreDetection; import com.njcn.gather.device.service.IPqDevService; import com.njcn.gather.script.pojo.po.SourceIssue; diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index b7601ec2..d32a6178 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -22,8 +22,9 @@ import com.njcn.gather.detection.util.business.DetectionCommunicateUtil; import com.njcn.gather.detection.util.socket.CnSocketUtil; import com.njcn.gather.detection.util.socket.FormalTestManager; import com.njcn.gather.detection.util.socket.SocketManager; -import com.njcn.gather.detection.util.socket.WebServiceManager; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; import com.njcn.gather.detection.util.socket.cilent.NettyClient; +import com.njcn.web.utils.RequestUtil; import com.njcn.gather.detection.util.socket.cilent.NettyContrastClientHandler; import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import com.njcn.gather.device.pojo.po.PqDev; @@ -146,8 +147,28 @@ public class PreDetectionServiceImpl implements PreDetectionService { } + /** + * 发送源通信校验Socket连接(数字式和模拟式检测模式) + * + *

该方法用于建立与程控源设备的Socket连接,进行源通信校验。主要流程:

+ * + * + * @param param 预检测参数,包含计划ID、用户ID等信息 + * @throws BusinessException 当计划源信息不存在或源初始化参数为空时抛出 + * + * @see SourceOperateCodeEnum#YJC_YTXJY 源通信校验操作码 + * @see SourceOperateCodeEnum#INIT_GATHER 初始化采集操作码 + */ private void sendYtxSocket(PreDetectionParam param) { - WebServiceManager.addPreDetectionParam(param); + String loginName = RequestUtil.getLoginNameByToken(); + WebServiceManager.addPreDetectionParam(loginName, param); AdPlanSource planSource = adPlanSourceService.getOne(new LambdaQueryWrapper().eq(AdPlanSource::getPlanId, param.getPlanId())); param.setSourceId(planSource.getSourceId()); if (ObjectUtil.isNotNull(planSource)) { @@ -172,10 +193,28 @@ public class PreDetectionServiceImpl implements PreDetectionService { } } + /** + * 发送源通信校验Socket连接(仿真模式) + * + *

该方法专门用于仿真检测模式下的源通信校验。与普通模式的区别:

+ * + * + * @param param 预检测参数,必须包含sourceId和userPageId + * @throws BusinessException 当源初始化参数为空时抛出 + * + * @see #sendYtxSocket(PreDetectionParam) 普通检测模式的源通信校验 + * @see SourceOperateCodeEnum#YJC_YTXJY 源通信校验操作码 + * @see SourceOperateCodeEnum#INIT_GATHER 初始化采集操作码 + */ private void sendYtxSocketSimulate(PreDetectionParam param) { SourceInitialize sourceParam = pqSourceService.getSourceInitializeParam(param.getSourceId()); param.setSourceId(sourceParam.getSourceId()); - WebServiceManager.addPreDetectionParam(param); + String loginName = RequestUtil.getLoginNameByToken(); + WebServiceManager.addPreDetectionParam(loginName, param); if (ObjectUtil.isNotNull(sourceParam)) { SocketMsg socketMsg = new SocketMsg<>(); socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java index 76ee7074..954fd1e7 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/CnSocketUtil.java @@ -6,6 +6,7 @@ import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.pojo.vo.WebSocketVO; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; /** * @Author: cdf diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java deleted file mode 100644 index be8ebf1f..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.njcn.gather.detection.util.socket; - -import com.alibaba.fastjson.JSON; -import com.njcn.gather.detection.pojo.param.PreDetectionParam; -import com.njcn.gather.detection.pojo.vo.WebSocketVO; -import io.netty.channel.Channel; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import lombok.extern.slf4j.Slf4j; -import org.apache.poi.ss.formula.functions.T; - -import java.time.LocalDateTime; -import java.util.Iterator; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @Description: webSocket存储的通道 - * @Author: wr - * @Date: 2024/12/11 13:04 - */ -@Slf4j -public class WebServiceManager { - - //key:页面 value:channel - private static final Map userSessions = new ConcurrentHashMap<>(); - - // 检测参数。key固定为preDetectionParam, value:检测参数 - private static final Map preDetectionParamMap = new ConcurrentHashMap<>(); - - public static void addUser(String userId, Channel channel) { - userSessions.put(userId, channel); - } - - - public static void removeChannel(String channelId) { - // 遍历并删除 - Iterator> iterator = userSessions.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().id().toString().equals(channelId)) { - iterator.remove(); - } - } - } - - public static Channel getChannelByUserId(String userId) { - return userSessions.get(userId); - } - - public static void sendMsg(String userId,String msg) { - Channel channel = userSessions.get(userId); - if(Objects.nonNull(channel) && channel.isActive()){ - TextWebSocketFrame wd = new TextWebSocketFrame(msg); - channel.writeAndFlush(wd); - }else { - log.error("{}-websocket推送消息失败;当前用户-{}-客户端已经断开连接", LocalDateTime.now(),userId); -// PreDetectionParam param = preDetectionParamMap.get("preDetectionParam"); -// CnSocketUtil.quitSend(param); -// CnSocketUtil.quitSendSource(param); - } - - } - - public static void sendMessage(String userId, WebSocketVO webSocketVO) { - Channel channel = userSessions.get(userId); - if(Objects.nonNull(channel) && channel.isActive()){ - TextWebSocketFrame wd = new TextWebSocketFrame(JSON.toJSONString(webSocketVO)); - channel.writeAndFlush(wd); - }else { - log.error("{}-websocket推送消息失败;当前用户-{}-客户端已经断开连接", LocalDateTime.now(),userId); - } - - } - - public static void addPreDetectionParam(PreDetectionParam preDetectionParam) { - preDetectionParamMap.put("preDetectionParam", preDetectionParam); - } - public static PreDetectionParam getPreDetectionParam() { - return preDetectionParamMap.get("preDetectionParam"); - } - public static void removePreDetectionParam() { - preDetectionParamMap.clear(); - } -} - \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index cde4878a..0c691619 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -8,7 +8,7 @@ import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketDataMsg; import com.njcn.gather.detection.util.socket.CnSocketUtil; import com.njcn.gather.detection.util.socket.SocketManager; -import com.njcn.gather.detection.util.socket.WebServiceManager; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index 32ebcd83..eea6100e 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -10,7 +10,7 @@ import com.njcn.gather.detection.pojo.vo.WebSocketVO; import com.njcn.gather.detection.util.socket.CnSocketUtil; import com.njcn.gather.detection.util.socket.FormalTestManager; import com.njcn.gather.detection.util.socket.SocketManager; -import com.njcn.gather.detection.util.socket.WebServiceManager; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; import com.njcn.gather.device.pojo.vo.PreDetection; import com.njcn.gather.script.pojo.po.SourceIssue; import com.njcn.gather.system.pojo.enums.DicDataEnum; diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketHandler.java deleted file mode 100644 index 4e5cabd6..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketHandler.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.njcn.gather.detection.util.socket.web; - -import cn.hutool.core.util.ObjectUtil; -import com.njcn.gather.detection.pojo.param.PreDetectionParam; -import com.njcn.gather.detection.util.socket.CnSocketUtil; -import com.njcn.gather.detection.util.socket.WebServiceManager; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.timeout.IdleStateEvent; -import lombok.extern.slf4j.Slf4j; - - -/** - * @Description: 泛型 代表的是处理数据的单位 - * TextWebSocketFrame : 文本信息帧 - * @Author: wr - * @Date: 2024/12/10 13:56 - */ - - -@Slf4j -public class WebSocketHandler extends SimpleChannelInboundHandler { - - private int times; - - private final static String QUESTION_MARK = "?"; - private final static String EQUAL_TO = "="; - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("webSocket服务端通道已建立" + ctx.channel().id()); - super.channelActive(ctx); - } - - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - //首次连接是FullHttpRequest,把用户id和对应的channel对象存储起来 - if (null != msg && msg instanceof FullHttpRequest) { - FullHttpRequest request = (FullHttpRequest) msg; - String uri = request.uri(); - String userId = getUrlParams(uri); - WebServiceManager.addUser(userId, ctx.channel()); - log.info("登录的用户id是:{}", userId); - //如果url包含参数,需要处理 - if (uri.contains(QUESTION_MARK)) { - String newUri = uri.substring(0, uri.indexOf(QUESTION_MARK)); - request.setUri(newUri); - } - - } else if (msg instanceof TextWebSocketFrame) { - //正常的TEXT消息类型 - TextWebSocketFrame frame = (TextWebSocketFrame) msg; - //log.info("webSocket服务器收到客户端心跳信息:{}", frame.text()); - if ("alive".equals(frame.text())) { - //System.out.println("webSocket心跳收到时间………………………………………………………………"+LocalDateTime.now()); - times = 0; - TextWebSocketFrame wd = new TextWebSocketFrame("over"); - ctx.channel().writeAndFlush(wd); - } - } - super.channelRead(ctx, msg); - - } - - /** - * 根据用户地址获取用户名 ws://127.0.0.1:7777/hello?name=aa - * - * @param url - * @return - */ - private static String getUrlParams(String url) { - if (!url.contains(EQUAL_TO)) { - return null; - } - String userId = url.substring(url.indexOf(EQUAL_TO) + 1); - return userId; - } - - - @Override - protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { - /* System.out.println("服务端消息 == " + msg.text()); - if(msg.text().equals("下发指令")){ - *//** - * 处理对应消息 - * 1.先下发所要操作的流程信息 - * 2.组装对应的入参信息 - * 3.再用socket信息返回结束 - *//* - //NettyClient.socketClient(msg.text(),new NettySourceClientHandler()); - - }*/ - - //可以直接调用text 拿到文本信息帧中的信息 - /* Channel channel = ctx.channel(); - TextWebSocketFrame resp = new TextWebSocketFrame(msg.text()); - channel.writeAndFlush(resp);*/ - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - //WebServiceManager.addUser(userId, ctx.channel()); - System.out.println("webSocket有新的连接接入:" + ctx); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) { - // 假设用户 ID 是从某个地方获取的,这里简单示例为 "userId" - System.out.println("weoSocket客户端退出: " + ctx.channel().id()); - WebServiceManager.removeChannel(ctx.channel().id().toString()); - - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) { - System.out.println("weoSocket断线" + ctx.channel().id()); - ctx.close(); - - PreDetectionParam preDetectionParam = WebServiceManager.getPreDetectionParam(); - if (ObjectUtil.isNotNull(preDetectionParam)) { - CnSocketUtil.quitSendSource(preDetectionParam); // 能否在这里关闭源socket连接? - CnSocketUtil.quitSend(preDetectionParam); - } else { - preDetectionParam = new PreDetectionParam(); - preDetectionParam.setUserPageId("cdf"); - CnSocketUtil.quitSend(preDetectionParam); - } - } - - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { - IdleStateEvent event = (IdleStateEvent) evt; - String eventDesc = null; - switch (event.state()) { - case READER_IDLE: - eventDesc = "读空闲"; - System.out.println("c端心跳检测发生超时事件--" + eventDesc); - times++; - if (times > 3) { - System.out.println("c端心跳检测空闲次数超过三次 关闭连接"); - ctx.channel().close(); - WebServiceManager.removeChannel(ctx.channel().id().toString()); - } - break; - case WRITER_IDLE: - eventDesc = "写空闲"; - break; - case ALL_IDLE: - eventDesc = "读写空闲"; - break; - } - - - //super.userEventTriggered(ctx, evt); - - } - - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); - ctx.close(); - } -} - diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketInitializer.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketInitializer.java deleted file mode 100644 index f40a86db..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketInitializer.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.njcn.gather.detection.util.socket.web; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpResponseDecoder; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; -import io.netty.handler.stream.ChunkedWriteHandler; -import io.netty.handler.timeout.IdleStateHandler; - -import java.util.concurrent.TimeUnit; - -/** - * @Description: webSocket服务端自定义配置 - * @Author: wr - * @Date: 2024/12/10 14:20 - */ - -public class WebSocketInitializer extends ChannelInitializer { - - - - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - //设置心跳机制 -// ch.pipeline().addLast(new IdleStateHandler(5, 0, 0)); - //增加编解码器 的另一种方式 - pipeline.addLast(new HttpServerCodec()); - pipeline.addLast(new HttpResponseDecoder()); - //块方式写的处理器 适合处理大数据 - pipeline.addLast(new ChunkedWriteHandler()); - //聚合 - pipeline.addLast(new HttpObjectAggregator(512 * 1024)); - /* - * 这个时候 我们需要声明我们使用的是 websocket 协议 - * netty为websocket也准备了对应处理器 设置的是访问路径 - * 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了 - * 这个handler是将http协议升级为websocket 并且使用 101 作为响应码 - * */ - pipeline.addLast(new IdleStateHandler(13, 0, 0, TimeUnit.SECONDS)); - pipeline.addLast(new WebSocketHandler()); - pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); - - pipeline.addLast(new ChannelInboundHandlerAdapter() { - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // 处理异常,例如记录日志、关闭连接等 - System.out.println("进入异常++++++++++++++++++"); - cause.printStackTrace(); - ctx.close(); - } - }); - - } - -} - diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java deleted file mode 100644 index 534f9e69..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.njcn.gather.detection.util.socket.web; - - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LoggingHandler; -import lombok.RequiredArgsConstructor; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - - -/** - * @Description: websocket服务端 - * @Author: wr - * @Date: 2024/12/10 13:59 - */ -@Component -@RequiredArgsConstructor -public class WebSocketService { - - - /** - * 端口号 - */ - @Value("${webSocket.port:7777}") - int port; - - - EventLoopGroup bossGroup; - EventLoopGroup workerGroup; - - - - @PostConstruct - public void start() { - new Thread(() -> { - //可以自定义线程的数量 - bossGroup = new NioEventLoopGroup(); - // 默认创建的线程数量 = CPU 处理器数量 *2 - workerGroup = new NioEventLoopGroup(); - try { - ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler()) - //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度 - .option(ChannelOption.SO_BACKLOG, 128) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - - //设置连接保持为活动状态 - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childHandler(new WebSocketInitializer()); - ChannelFuture future = serverBootstrap.bind(port).sync(); - future.addListener(f -> { - if (future.isSuccess()) { - System.out.println("webSocket服务启动成功"); - } else { - System.out.println("webSocket服务启动失败"); - } - }); - future.channel().closeFuture().sync(); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - } - }).start(); - } - - - /** - * 释放资源 - */ - @PreDestroy - public void destroy() throws InterruptedException { - if (bossGroup != null) { - bossGroup.shutdownGracefully().sync(); - } - if (workerGroup != null) { - workerGroup.shutdownGracefully().sync(); - } - - System.out.println("webSocket销毁---------------"); - } - -} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java new file mode 100644 index 00000000..589e82c6 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java @@ -0,0 +1,261 @@ +package com.njcn.gather.detection.util.socket.websocket; + +import com.alibaba.fastjson.JSON; +import com.njcn.gather.detection.pojo.param.PreDetectionParam; +import com.njcn.gather.detection.pojo.vo.WebSocketVO; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * WebSocket会话管理器 + * + *

负责管理电能质量检测系统中的WebSocket连接会话和检测参数,主要功能包括:

+ *
    + *
  • WebSocket连接会话的添加、删除和管理
  • + *
  • 向指定用户推送实时消息(文本消息和结构化消息)
  • + *
  • 全局检测参数的存储和管理
  • + *
+ * + *

线程安全性:

+ * 使用ConcurrentHashMap确保在高并发环境下的线程安全。 + * + *

使用场景:

+ *
    + *
  • 检测进度实时推送
  • + *
  • 检测结果数据推送
  • + *
  • 设备状态变更通知
  • + *
  • 异常信息推送
  • + *
+ * + *

消息推送方式:

+ *
    + *
  • {@link #sendMsg(String, String)} - 发送纯文本消息
  • + *
  • {@link #sendMessage(String, WebSocketVO)} - 发送结构化JSON消息
  • + *
+ * + * @author wr + * @date 2024/12/11 13:04 + * @version 1.0 + * @since 检测系统 v2.3.12 + * + * @see com.njcn.gather.detection.util.socket.websocket.WebSocketHandler WebSocket处理器 + * @see com.njcn.gather.detection.pojo.vo.WebSocketVO WebSocket消息对象 + */ +@Slf4j +public class WebServiceManager { + + /** + * WebSocket用户会话存储 + * key: 用户ID, value: WebSocket连接通道 + */ + private static final Map userSessions = new ConcurrentHashMap<>(); + + /** + * 检测参数存储 + * key: 用户ID(userPageId), value: 检测参数对象 + * 支持多用户并发检测,每个用户的检测参数独立存储 + */ + private static final Map preDetectionParamMap = new ConcurrentHashMap<>(); + + /** + * 添加用户WebSocket会话 + * + * @param userId 用户ID,不能为null + * @param channel WebSocket连接通道,不能为null + */ + public static void addUser(String userId, Channel channel) { + userSessions.put(userId, channel); + } + + + /** + * 根据用户ID移除会话(推荐使用) + * 时间复杂度:O(1) + * + * @param userId 用户ID + * @return 被移除的Channel,如果不存在则返回null + */ + public static Channel removeByUserId(String userId) { + return userSessions.remove(userId); + } + + /** + * 根据channelId移除会话(兼容老版本) + * 时间复杂度:O(n),建议使用removeByUserId替代 + * + * @param channelId 通道ID + * @deprecated 建议使用 {@link #removeByUserId(String)} 替代 + */ + @Deprecated + public static void removeChannel(String channelId) { + // 遍历并删除 + Iterator> iterator = userSessions.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().id().toString().equals(channelId)) { + iterator.remove(); + break; // 找到后立即退出,避免继续遍历 + } + } + } + + + /** + * 发送纯文本消息给指定用户 + * + * @param userId 目标用户ID + * @param msg 要发送的文本消息 + */ + public static void sendMsg(String userId, String msg) { + Channel channel = userSessions.get(userId); + if (Objects.nonNull(channel) && channel.isActive()) { + TextWebSocketFrame frame = new TextWebSocketFrame(msg); + channel.writeAndFlush(frame); + } else { + log.error("WebSocket推送消息失败,用户连接已断开,时间: {}, userId: {}", LocalDateTime.now(), userId); + } + } + + /** + * 发送结构化消息给指定用户 + * 消息会被序列化为JSON格式后发送 + * + * @param userId 目标用户ID + * @param webSocketVO 要发送的结构化消息对象 + */ + public static void sendMessage(String userId, WebSocketVO webSocketVO) { + Channel channel = userSessions.get(userId); + if (Objects.nonNull(channel) && channel.isActive()) { + TextWebSocketFrame frame = new TextWebSocketFrame(JSON.toJSONString(webSocketVO)); + channel.writeAndFlush(frame); + } else { + log.error("WebSocket推送结构化消息失败,用户连接已断开,时间: {}, userId: {}", LocalDateTime.now(), userId); + } + } + + /** + * 存储检测参数(基于用户ID) + * 支持多用户并发检测,每个用户的检测参数独立存储 + * + * @param userId 用户ID(登录名) + * @param preDetectionParam 检测参数对象 + * @throws IllegalArgumentException 当userId或检测参数为空时抛出 + */ + public static void addPreDetectionParam(String userId, PreDetectionParam preDetectionParam) { + if (userId == null || userId.trim().isEmpty()) { + throw new IllegalArgumentException("用户ID不能为空"); + } + if (preDetectionParam == null) { + throw new IllegalArgumentException("检测参数不能为空"); + } + preDetectionParamMap.put(userId, preDetectionParam); + } + + /** + * 存储检测参数(兼容老版本) + * 从检测参数对象中获取userPageId作为key + * + * @param preDetectionParam 检测参数对象,必须包含userPageId + * @throws IllegalArgumentException 当userPageId为空时抛出 + * @deprecated 建议使用 {@link #addPreDetectionParam(String, PreDetectionParam)} + */ + @Deprecated + public static void addPreDetectionParam(PreDetectionParam preDetectionParam) { + if (preDetectionParam == null || preDetectionParam.getUserPageId() == null) { + throw new IllegalArgumentException("检测参数或用户ID不能为空"); + } + preDetectionParamMap.put(preDetectionParam.getUserPageId(), preDetectionParam); + } + + /** + * 获取指定用户的检测参数 + * + * @param userId 用户ID + * @return 检测参数对象,如果不存在则返回null + */ + public static PreDetectionParam getPreDetectionParam(String userId) { + return preDetectionParamMap.get(userId); + } + + /** + * 获取当前检测参数(兼容老版本) + * 注意:该方法已废弃,建议使用 {@link #getPreDetectionParam(String)} + * + * @return 检测参数对象,如果不存在则返回null + * @deprecated 多用户并发场景下该方法不安全,请使用 {@link #getPreDetectionParam(String)} + */ + @Deprecated + public static PreDetectionParam getPreDetectionParam() { + if (preDetectionParamMap.size() == 1) { + return preDetectionParamMap.values().iterator().next(); + } + log.warn("存在多个检测参数,无法确定返回哪个,当前参数数量: {}", preDetectionParamMap.size()); + return null; + } + + /** + * 移除指定用户的检测参数 + * + * @param userId 用户ID + * @return 被移除的检测参数,如果不存在则返回null + */ + public static PreDetectionParam removePreDetectionParam(String userId) { + return preDetectionParamMap.remove(userId); + } + + /** + * 清空所有检测参数 + */ + public static void removeAllPreDetectionParam() { + preDetectionParamMap.clear(); + } + + /** + * 清空所有检测参数(兼容老版本) + * + * @deprecated 建议使用 {@link #removeAllPreDetectionParam()} 或 {@link #removePreDetectionParam(String)} + */ + @Deprecated + public static void removePreDetectionParam() { + removeAllPreDetectionParam(); + } + + // ================================ 实用功能方法 ================================ + + /** + * 获取当前在线用户数量 + * + * @return 在线用户数量 + */ + public static int getOnlineUserCount() { + return userSessions.size(); + } + + /** + * 检查指定用户是否在线 + * + * @param userId 用户ID + * @return true如果用户在线且连接活跃,否则返回false + */ + public static boolean isUserOnline(String userId) { + Channel channel = userSessions.get(userId); + return Objects.nonNull(channel) && channel.isActive(); + } + + /** + * 获取所有在线用户ID集合 + * + * @return 在线用户ID集合的快照 + */ + public static java.util.Set getOnlineUserIds() { + return new java.util.HashSet<>(userSessions.keySet()); + } +} + \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketConstants.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketConstants.java new file mode 100644 index 00000000..a5472187 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketConstants.java @@ -0,0 +1,49 @@ +package com.njcn.gather.detection.util.socket.websocket; + +/** + * WebSocket常量管理类 + * + * @author wr + * @date 2024/12/10 + */ +public final class WebSocketConstants { + + /** + * URL参数分隔符 + */ + public static final String QUESTION_MARK = "?"; + + /** + * URL参数等号分隔符 + */ + public static final String EQUAL_TO = "="; + + /** + * 客户端心跳消息 + */ + public static final String HEARTBEAT_PING = "alive"; + + /** + * 服务端心跳响应 + */ + public static final String HEARTBEAT_PONG = "over"; + + /** + * 心跳超时最大次数 + */ + public static final int MAX_HEARTBEAT_MISS_COUNT = 3; + + /** + * WebSocket握手失败状态码 + */ + public static final int HANDSHAKE_FAILED_STATUS = 4000; + + /** + * WebSocket握手失败原因 + */ + public static final String HANDSHAKE_FAILED_REASON = "Missing required userId parameter"; + + private WebSocketConstants() { + // 私有构造函数,防止实例化 + } +} \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java new file mode 100644 index 00000000..e89c31b8 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketHandler.java @@ -0,0 +1,389 @@ +package com.njcn.gather.detection.util.socket.websocket; + +import cn.hutool.core.util.ObjectUtil; +import com.njcn.gather.detection.pojo.param.PreDetectionParam; +import com.njcn.gather.detection.util.socket.CnSocketUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.AttributeKey; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +import static com.njcn.gather.detection.util.socket.websocket.WebSocketConstants.*; + + +/** + * WebSocket消息处理器 + * + *

负责处理电能质量检测系统中的WebSocket连接和消息通信,主要功能包括:

+ *
    + *
  • WebSocket连接的建立、维护和断开
  • + *
  • 用户身份验证和会话管理
  • + *
  • 心跳检测和连接保活
  • + *
  • 检测状态和结果的实时推送
  • + *
  • 异常处理和资源清理
  • + *
+ * + *

通信协议:

+ *
+ * 连接URL: ws://host:port/path?name=userId
+ * 心跳消息: "alive" -> "over"
+ * 业务消息: JSON格式的检测数据和状态信息
+ * 
+ * + *

安全策略:

+ *
    + *
  • 连接时必须提供有效的userId参数,否则拒绝连接
  • + *
  • 支持心跳超时检测,超时3次自动断开连接
  • + *
  • 连接断开时自动清理相关Socket资源
  • + *
+ * + *

使用场景:

+ * 主要用于前端实时接收检测进度、检测结果、设备状态等信息的推送, + * 配合detection模块的ResponseService类实现完整的实时通信链路。 + * + * @author wr + * @date 2024/12/10 13:56 + * @version 1.0 + * @since 检测系统 v2.3.12 + * + * @see WebServiceManager 会话管理器 + * @see WebSocketConstants 常量定义 + * @see com.njcn.gather.detection.handler.SocketDevResponseService 设备响应处理 + * @see com.njcn.gather.detection.handler.SocketSourceResponseService 源响应处理 + */ + + +@Slf4j +public class WebSocketHandler extends SimpleChannelInboundHandler { + + // ================================ 字段定义 ================================ + + /** + * 心跳超时计数器 + */ + private int times; + + /** + * 当前WebSocket连接对应的用户ID + * 在首次HTTP握手时从URL参数中提取并存储 + * 用于后续的Socket连接管理和资源清理 + */ + private String userId; + + /** + * 心跳响应内容常量 + * 注意:不能预创建TextWebSocketFrame对象,因为ByteBuf状态会改变 + */ + private static final String HEARTBEAT_RESPONSE_TEXT = HEARTBEAT_PONG; + + // ================================ Netty生命周期方法 ================================ + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.info("webSocket服务端通道已建立,channelId: {}", ctx.channel().id()); + super.channelActive(ctx); + } + + // HTTP握手处理已移至WebSocketPreprocessor,这里只处理WebSocket帧 + + @Override + protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { + handleWebSocketMessage(ctx, msg); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + log.info("webSocket有新的连接接入,channelId: {}", ctx.channel().id()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + log.info("webSocket客户端退出,channelId: {}, userId: {}", ctx.channel().id(), this.userId); + if (this.userId != null) { + WebServiceManager.removeByUserId(this.userId); + } else { + // 备用方案:如果userId为空,使用传统方法 + WebServiceManager.removeChannel(ctx.channel().id().toString()); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + log.info("webSocket连接断线,channelId: {}, userId: {}", ctx.channel().id(), this.userId); + + // 确保通道关闭 + if (ctx.channel() != null && ctx.channel().isActive()) { + ctx.close(); + } + + // 使用Handler实例中保存的userId进行资源清理 + cleanupSocketResources(this.userId); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + // 处理WebSocket握手完成事件 + if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { + WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = + (WebSocketServerProtocolHandler.HandshakeComplete) evt; + + // 从Channel属性获取userId(由WebSocketPreprocessor设置) + this.userId = ctx.channel().attr(AttributeKey.valueOf("userId")).get(); + + log.info("WebSocket协议升级完成,userId: {}, channelId: {}, requestUri: {}", + this.userId, ctx.channel().id(), handshakeComplete.requestUri()); + + // 握手完成后建立用户会话 + if (this.userId != null) { + WebServiceManager.addUser(this.userId, ctx.channel()); + log.info("WebSocket用户会话已建立,userId: {}, channelId: {}", this.userId, ctx.channel().id()); + } + + // 发送连接成功消息给前端 + sendConnectionSuccessMessage(ctx); + return; + } + + // 处理心跳超时事件 + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + String eventDesc; + switch (event.state()) { + case READER_IDLE: + eventDesc = "读空闲"; + log.warn("客户端心跳检测发生超时事件: {},channelId: {}", eventDesc, ctx.channel().id()); + times++; + if (times > MAX_HEARTBEAT_MISS_COUNT) { + log.error("客户端心跳检测空闲次数超过{}次,关闭连接,channelId: {}, userId: {}", MAX_HEARTBEAT_MISS_COUNT, ctx.channel().id(), this.userId); + ctx.channel().close(); + if (this.userId != null) { + WebServiceManager.removeByUserId(this.userId); + } else { + WebServiceManager.removeChannel(ctx.channel().id().toString()); + } + } + break; + case WRITER_IDLE: + log.debug("webSocket写空闲事件,channelId: {}", ctx.channel().id()); + break; + case ALL_IDLE: + log.debug("webSocket读写空闲事件,channelId: {}", ctx.channel().id()); + break; + default: + break; + } + return; + } + + // 其他事件传递给父类处理 + super.userEventTriggered(ctx, evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + String channelId = ctx.channel().id().toString(); + try { + // 1. 异常分类记录 + logExceptionByType(channelId, cause); + // 2. 业务清理 + cleanupOnException(ctx, cause); + // 3. 连接处理决策 + handleConnectionByExceptionType(ctx, cause); + } catch (Exception e) { + // 防止异常处理本身出错 + log.error("异常处理过程中发生错误,强制关闭连接,channelId: {}", channelId, e); + ctx.close(); + } + } + + // ================================ HTTP握手处理已移至WebSocketPreprocessor ================================ + + // ================================ WebSocket消息处理 ================================ + + /** + * 发送连接成功消息给前端 + * WebSocket握手完成后立即调用,通知前端连接建立成功 + * + * @param ctx Netty通道上下文 + */ + private void sendConnectionSuccessMessage(ChannelHandlerContext ctx) { + if (ctx == null || ctx.channel() == null || !ctx.channel().isActive()) { + log.warn("无法发送连接成功消息:通道不可用, userId: {}", this.userId); + return; + } + + try { + // 构建连接成功消息 + String welcomeMessage = String.format("{\"type\":\"connection\",\"status\":\"success\",\"message\":\"WebSocket连接建立成功\",\"userId\":\"%s\",\"timestamp\":%d}", + this.userId, System.currentTimeMillis()); + + TextWebSocketFrame frame = new TextWebSocketFrame(welcomeMessage); + ctx.channel().writeAndFlush(frame); + + log.info("已发送连接成功消息给前端, userId: {}, channelId: {}", this.userId, ctx.channel().id()); + } catch (Exception e) { + log.error("发送连接成功消息失败, userId: {}, channelId: {}", this.userId, ctx.channel().id(), e); + } + } + + /** + * 处理WebSocket文本消息 + * 这里是所有WebSocket文本消息的统一处理入口 + * + * @param ctx Netty通道上下文 + * @param frame WebSocket文本帧 + */ + private void handleWebSocketMessage(ChannelHandlerContext ctx, TextWebSocketFrame frame) { + String messageText = frame.text(); + // 处理心跳消息 + if (HEARTBEAT_PING.equals(messageText)) { + handleHeartbeat(ctx); + } else { + // 处理业务消息 + handleBusinessMessage(ctx, frame, messageText); + } + } + + /** + * 处理心跳消息 + * 重置超时计数器并回复心跳响应 + * + * @param ctx Netty通道上下文 + */ + private void handleHeartbeat(ChannelHandlerContext ctx) { + if (ctx == null || ctx.channel() == null || !ctx.channel().isActive()) { + log.warn("心跳处理失败:通道不可用,userId: {}", this.userId); + return; + } + log.debug("收到心跳消息,userId: {}, channelId: {}", this.userId, ctx.channel().id()); + // 重置心跳超时计数器 + times = 0; + // 每次创建新的心跳响应帧,确保内容正确 + TextWebSocketFrame heartbeatFrame = new TextWebSocketFrame(HEARTBEAT_RESPONSE_TEXT); + ctx.channel().writeAndFlush(heartbeatFrame); + log.debug("发送心跳响应,userId: {}, channelId: {}", this.userId, ctx.channel().id()); + } + + /** + * 处理业务消息 + * 可以在这里扩展具体的业务逻辑处理 + * + * @param ctx Netty通道上下文 + * @param frame WebSocket文本帧 + * @param messageText 消息文本内容 + */ + private void handleBusinessMessage(ChannelHandlerContext ctx, TextWebSocketFrame frame, String messageText) { + log.debug("收到WebSocket业务消息,userId: {}, channelId: {}, message: {}", + this.userId, ctx.channel().id(), messageText); + // TODO: 根据业务需要扩展消息处理逻辑 + // 例如: + // - 解析JSON消息 + // - 根据消息类型分发到不同的处理器 + // - 调用业务服务处理具体逻辑 + } + + // ================================ 异常处理 ================================ + + /** + * 根据异常类型记录不同级别的日志 + */ + private void logExceptionByType(String channelId, Throwable cause) { + if (cause instanceof IOException) { + log.info("webSocket网络异常,客户端可能异常断开,channelId: {}, 异常: {}", channelId, cause.getMessage()); + } else if (cause instanceof WebSocketHandshakeException) { + log.warn("webSocket握手异常,channelId: {}, 异常: {}", channelId, cause.getMessage()); + } else if (cause instanceof DecoderException || cause instanceof CorruptedFrameException) { + log.error("webSocket协议解码异常,可能是恶意请求,channelId: {}, 异常: {}", channelId, cause.getMessage(), cause); + } else if (cause instanceof IllegalArgumentException) { + log.warn("webSocket参数异常,channelId: {}, 异常: {}", channelId, cause.getMessage()); + } else { + log.error("webSocket未分类异常,channelId: {}, 类型: {}, 异常: {}", + channelId, cause.getClass().getSimpleName(), cause.getMessage(), cause); + } + } + + /** + * 异常发生时的业务清理工作 + */ + private void cleanupOnException(ChannelHandlerContext ctx, Throwable cause) { + if (ctx == null || ctx.channel() == null) { + log.warn("异常处理:通道上下文为空,无法进行清理"); + return; + } + + String channelId = ctx.channel().id().toString(); + + // 清理会话 + if (this.userId != null) { + WebServiceManager.removeByUserId(this.userId); + log.debug("已清理WebSocket会话,userId: {}, channelId: {}", this.userId, channelId); + } else { + WebServiceManager.removeChannel(channelId); + log.debug("已清理WebSocket会话(使用channelId),channelId: {}", channelId); + } + + // 清理检测相关资源 + cleanupSocketResources(this.userId); + } + + /** + * 根据异常类型决定连接处理策略 + */ + private void handleConnectionByExceptionType(ChannelHandlerContext ctx, Throwable cause) { + String channelId = ctx.channel().id().toString(); + // URL参数异常但连接本身可能正常,尝试保持连接 + if (cause instanceof IllegalArgumentException && + cause.getMessage() != null && cause.getMessage().contains("URL")) { + log.info("URL参数异常,尝试保持连接,channelId: {}", channelId); + return; + } + // 其他情况都关闭连接 + log.debug("关闭WebSocket连接,channelId: {}", channelId); + ctx.close(); + } + + // ================================ 资源清理 ================================ + + /** + * 清理Socket相关资源 + * + * @param userId 用户ID + */ + private void cleanupSocketResources(String userId) { + if (userId == null || userId.trim().isEmpty()) { + log.warn("userId为空,无法进行Socket连接清理"); + return; + } + + try { + PreDetectionParam preDetectionParam = WebServiceManager.getPreDetectionParam(userId); + if (ObjectUtil.isNotNull(preDetectionParam)) { + // 使用该用户的检测参数关闭Socket连接 + log.info("使用用户检测参数关闭Socket连接,userId: {}", userId); + CnSocketUtil.quitSendSource(preDetectionParam); + CnSocketUtil.quitSend(preDetectionParam); + // 清理完成后移除该用户的检测参数 + WebServiceManager.removePreDetectionParam(userId); + } else { + // 使用当前Handler的userId创建检测参数进行清理 + log.info("未找到用户检测参数,使用默认参数关闭Socket连接,userId: {}", userId); + preDetectionParam = new PreDetectionParam(); + preDetectionParam.setUserPageId(userId); + // 只关闭设备Socket,因为没有sourceId等其他参数 + CnSocketUtil.quitSend(preDetectionParam); + } + } catch (Exception e) { + log.error("清理Socket连接时发生异常,userId: {}", userId, e); + } + } + + // ================================ URL解析工具已移至WebSocketPreprocessor ================================ +} \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketInitializer.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketInitializer.java new file mode 100644 index 00000000..021d1d89 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketInitializer.java @@ -0,0 +1,184 @@ +package com.njcn.gather.detection.util.socket.websocket; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.AttributeKey; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +/** + * WebSocket服务端管道初始化器 + * + * 职责: + * 1. 为每个新的WebSocket连接配置处理器链(Pipeline) + * 2. 按正确顺序添加各种Handler,确保数据流正确处理 + * 3. 配置HTTP到WebSocket的协议升级 + * 4. 设置心跳检测和异常处理机制 + * + * 处理流程: + * HTTP请求 → HTTP编解码 → 分块处理 → 消息聚合 → 协议升级 → 心跳检测 → 业务处理 → 异常处理 + * + * @Description: webSocket服务端自定义配置 + * @Author: wr + * @Date: 2024/12/10 14:20 + */ +@Slf4j +public class WebSocketInitializer extends ChannelInitializer { + + /** + * WebSocket访问路径 + */ + private static final String WEBSOCKET_PATH = "/hello"; + + /** + * HTTP消息最大聚合大小:512KB + * 用于WebSocket握手和消息传输 + */ + private static final int MAX_CONTENT_LENGTH = 512 * 1024; + + /** + * 心跳检测间隔:13秒 + * 13秒内没有收到客户端消息则触发空闲事件 + */ + private static final int READER_IDLE_TIME_SECONDS = 13; + + /** + * 为每个新连接初始化处理器管道 + * 注意:Handler的添加顺序非常重要,决定了数据的处理流向 + * + * @param ch 新建立的Socket通道 + * @throws Exception 初始化过程中的异常 + */ + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + + // 1. HTTP协议处理器 + // HttpServerCodec = HttpRequestDecoder + HttpResponseEncoder + // 负责HTTP请求解码和HTTP响应编码 + pipeline.addLast("http-codec", new HttpServerCodec()); + + // 2. 分块写入处理器 + // 用于处理大文件的分块传输,防止内存溢出 + // 支持ChunkedInput,如ChunkedFile、ChunkedNioFile等 + pipeline.addLast("chunked-write", new ChunkedWriteHandler()); + + // 3. HTTP消息聚合器 + // 将分片的HTTP消息重新组装成完整的FullHttpRequest或FullHttpResponse + // WebSocket握手需要完整的HTTP请求,所以这个Handler必须添加 + pipeline.addLast("http-aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH)); + + // 4. WebSocket URL预处理器 + // 在WebSocket握手之前处理URL参数,验证用户ID + pipeline.addLast("websocket-preprocessor", new WebSocketPreprocessor()); + + // 5. WebSocket协议升级处理器 + // 处理WebSocket握手,将HTTP协议升级为WebSocket协议 + // 只有访问指定路径(WEBSOCKET_PATH)的请求才会被升级 + // 升级后会移除HTTP相关的Handler,添加WebSocket相关的Handler + pipeline.addLast("websocket-protocol", new WebSocketServerProtocolHandler(WEBSOCKET_PATH)); + + // 6. 空闲状态检测器 + // 检测连接的空闲状态,用于心跳机制 + // readerIdleTime: 读空闲时间,writerIdleTime: 写空闲时间,allIdleTime: 读写空闲时间 + pipeline.addLast("idle-state", new IdleStateHandler(READER_IDLE_TIME_SECONDS, 0, 0, TimeUnit.SECONDS)); + + // 7. 自定义WebSocket业务处理器 + // 处理WebSocket帧,实现具体的业务逻辑 + // 包括心跳处理、消息路由、连接管理等 + pipeline.addLast("websocket-handler", new WebSocketHandler()); + + // 7. 全局异常处理器 + // 处理整个管道中未被捕获的异常,作为最后的异常处理兜底 + pipeline.addLast("exception-handler", new GlobalExceptionHandler()); + } + + /** + * WebSocket预处理器 + * 在WebSocket握手之前验证URL参数并清理URL + */ + private static class WebSocketPreprocessor extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + FullHttpRequest request = (FullHttpRequest) msg; + String uri = request.uri(); + + log.debug("WebSocket预处理器收到HTTP请求:{}", uri); + + // 验证并提取userId + String userId = extractUserId(uri); + if (userId == null || userId.trim().isEmpty()) { + log.warn("WebSocket连接被拒绝:缺少userId参数, uri: {}", uri); + FullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.BAD_REQUEST + ); + ctx.writeAndFlush(response).addListener(f -> ctx.close()); + return; + } + + // 将userId存储到Channel属性中 + ctx.channel().attr(AttributeKey.valueOf("userId")).set(userId); + + // 清理URL参数 + if (uri.contains("?")) { + String cleanUri = uri.substring(0, uri.indexOf("?")); + request.setUri(cleanUri); + log.debug("URL已清理,原始: {}, 清理后: {}, userId: {}", uri, cleanUri, userId); + } + } + + // 继续传递给下一个Handler + super.channelRead(ctx, msg); + } + + private String extractUserId(String uri) { + if (!uri.contains("name=")) { + return null; + } + int start = uri.indexOf("name=") + 5; + int end = uri.indexOf("&", start); + if (end == -1) { + return uri.substring(start); + } else { + return uri.substring(start, end); + } + } + } + + /** + * 全局异常处理器 + * 作为管道中的最后一个Handler,捕获所有未处理的异常 + */ + private static class GlobalExceptionHandler extends ChannelInboundHandlerAdapter { + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 记录异常详情,便于问题排查 + log.error("WebSocket连接发生未处理异常,远程地址:{},异常信息:{}", + ctx.channel().remoteAddress(), cause.getMessage(), cause); + + // 优雅关闭连接 + if (ctx.channel().isActive()) { + ctx.close(); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.debug("WebSocket连接断开,远程地址:{}", ctx.channel().remoteAddress()); + super.channelInactive(ctx); + } + } +} + diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketService.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketService.java new file mode 100644 index 00000000..df44eb0e --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebSocketService.java @@ -0,0 +1,237 @@ +package com.njcn.gather.detection.util.socket.websocket; + + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LoggingHandler; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + + +/** + * WebSocket服务端核心类 + * + * 职责: + * 1. 启动基于Netty的WebSocket服务器 + * 2. 管理服务器生命周期(启动/关闭) + * 3. 提供高性能的WebSocket通信支持 + * + * 特性: + * - 使用ApplicationRunner确保在Spring容器完全启动后再启动WebSocket服务 + * - 使用CompletableFuture异步启动,避免阻塞Spring Boot主线程 + * - 支持优雅关闭,确保资源正确释放 + * - 完善的异常处理和日志记录 + * + * @Description: websocket服务端 + * @Author: wr + * @Date: 2024/12/10 13:59 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class WebSocketService implements ApplicationRunner { + + /** + * WebSocket服务器监听端口 + * 默认7777端口,可通过配置文件webSocket.port自定义 + * 客户端连接地址:ws://host:port/hello?name=userId + */ + @Value("${webSocket.port:7777}") + int port; + + /** + * Netty Boss线程组 + * 专门负责接受新的客户端连接请求 + * 通常配置1个线程即可,因为接受连接的操作相对简单 + */ + EventLoopGroup bossGroup; + + /** + * Netty Worker线程组 + * 专门负责处理已建立连接的I/O操作和业务逻辑 + * 默认线程数 = CPU核心数 * 2,用于并发处理多个客户端 + */ + EventLoopGroup workerGroup; + + /** + * 服务器通道引用 + * 保存绑定端口后的Channel,用于服务器关闭时释放资源 + */ + private Channel serverChannel; + + /** + * 异步启动任务的Future对象 + * 用于管理WebSocket服务器的异步启动过程 + * 可以用来取消启动任务或检查启动状态 + */ + private CompletableFuture serverFuture; + + + + /** + * Spring Boot应用启动完成后自动调用此方法 + * 使用ApplicationRunner确保在所有Bean初始化完成后再启动WebSocket服务 + */ + @Override + public void run(ApplicationArguments args){ + // 使用CompletableFuture异步启动WebSocket服务,避免阻塞Spring Boot主线程 + // 这样可以让应用快速启动完成,WebSocket服务在后台异步启动 + serverFuture = CompletableFuture.runAsync(this::startWebSocketServer) + .exceptionally(throwable -> { + // 如果启动过程中发生异常,记录日志但不影响应用启动 + log.error("WebSocket服务启动异常", throwable); + return null; + }); + } + + /** + * 启动WebSocket服务器的核心方法 + * 此方法会一直阻塞直到服务器关闭,所以需要在异步线程中执行 + */ + private void startWebSocketServer() { + try { + // 1. 创建线程组 + // bossGroup: 专门负责接受新的客户端连接请求 + // 可以自定义线程的数量,这里使用默认值(通常为1个线程) + bossGroup = new NioEventLoopGroup(1); + + // workerGroup: 专门负责处理已建立连接的I/O操作 + // 默认创建的线程数量 = CPU 处理器数量 * 2,用于处理业务逻辑 + workerGroup = new NioEventLoopGroup(); + + // 2. 配置服务器启动参数 + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler()) + // 网络配置参数 + .option(ChannelOption.SO_BACKLOG, 128) + // TCP连接建立超时时间5秒 + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + // 子通道配置(针对每个客户端连接) + // 启用TCP keepalive机制,检测死连接 + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new WebSocketInitializer()); + + // 3. 绑定端口并启动服务器 + ChannelFuture future = serverBootstrap.bind(port).sync(); + // 保存服务器通道引用,用于后续关闭操作 + serverChannel = future.channel(); + // 4. 监听绑定结果并记录日志 + future.addListener(f -> { + if (future.isSuccess()) { + log.info("webSocket服务启动成功,端口:{}", port); + } else { + log.error("webSocket服务启动失败,端口:{}", port); + } + }); + + // 5. 等待服务器关闭 + // 这里会一直阻塞,直到serverChannel被外部关闭 + // 这就是为什么需要在异步线程中执行此方法的原因 + future.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + // 如果线程被中断(比如应用关闭),记录日志并恢复中断状态 + log.error("WebSocket服务启动过程中被中断", e); + Thread.currentThread().interrupt(); // 恢复中断状态 + } catch (Exception e) { + // 捕获其他所有异常,记录日志并抛出运行时异常 + log.error("WebSocket服务启动失败", e); + throw new RuntimeException("WebSocket服务启动失败", e); + } finally { + // 无论成功还是失败,都要清理资源 + shutdownGracefully(); + } + } + + + /** + * 优雅关闭Netty线程组资源 + * 私有方法,用于在服务器启动异常时清理资源 + */ + private void shutdownGracefully() { + // 优雅关闭接收连接的线程组 + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + // 优雅关闭处理I/O的线程组 + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } + + /** + * Spring容器销毁时自动调用此方法释放资源 + * 使用@PreDestroy确保在应用关闭时优雅地关闭WebSocket服务 + */ + @PreDestroy + public void destroy() throws InterruptedException { + log.info("正在关闭WebSocket服务..."); + + // 步骤1: 首先关闭服务器通道,停止接受新的连接请求 + // 这样可以确保不会有新的客户端连接进来 + if (serverChannel != null) { + try { + // 等待最多5秒让服务器通道关闭 + serverChannel.close().awaitUninterruptibly(5, TimeUnit.SECONDS); + log.debug("服务器通道已关闭"); + } catch (Exception e) { + log.warn("关闭服务器通道时发生异常", e); + } + } + + // 步骤2: 关闭bossGroup线程组 + // bossGroup负责接受连接,现在可以安全关闭了 + if (bossGroup != null) { + try { + // 优雅关闭:静默期0秒,超时时间5秒 + // 静默期0秒意味着立即开始关闭,超时5秒后强制关闭 + bossGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync(); + log.debug("bossGroup线程组已关闭"); + } catch (InterruptedException e) { + log.warn("关闭bossGroup时被中断", e); + Thread.currentThread().interrupt(); // 恢复中断状态 + } + } + + // 步骤3: 关闭workerGroup线程组 + // workerGroup负责处理I/O,需要等待现有连接处理完成 + if (workerGroup != null) { + try { + // 等待现有任务完成,但最多等待5秒 + workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).sync(); + log.debug("workerGroup线程组已关闭"); + } catch (InterruptedException e) { + log.warn("关闭workerGroup时被中断", e); + Thread.currentThread().interrupt(); // 恢复中断状态 + } + } + + // 步骤4: 取消异步启动任务(如果还在运行) + // 这可以避免在应用关闭后还有线程在后台运行 + if (serverFuture != null && !serverFuture.isDone()) { + // true表示允许中断正在执行的任务 + boolean cancelled = serverFuture.cancel(true); + if (cancelled) { + log.debug("异步启动任务已取消"); + } + } + + log.info("webSocket服务已销毁"); + } + +} diff --git a/entrance/src/test/java/com/njcn/BaseJunitTest.java b/entrance/src/test/java/com/njcn/BaseJunitTest.java index 5d60280a..0193eb2b 100644 --- a/entrance/src/test/java/com/njcn/BaseJunitTest.java +++ b/entrance/src/test/java/com/njcn/BaseJunitTest.java @@ -1,7 +1,6 @@ package com.njcn; import com.njcn.gather.EntranceApplication; -import com.njcn.gather.report.pojo.DevReportParam; import com.njcn.gather.report.service.IPqReportService; import com.njcn.http.util.RestTemplateUtil; import org.junit.Test; @@ -13,7 +12,6 @@ import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.web.WebAppConfiguration; import java.io.File; -import java.util.HashMap; /** diff --git a/event_smart/src/main/java/com/njcn/gather/event/transientes/security/MyUserDetailsService.java b/event_smart/src/main/java/com/njcn/gather/event/transientes/security/MyUserDetailsService.java index ff6eb0b5..fadb29ba 100644 --- a/event_smart/src/main/java/com/njcn/gather/event/transientes/security/MyUserDetailsService.java +++ b/event_smart/src/main/java/com/njcn/gather/event/transientes/security/MyUserDetailsService.java @@ -9,7 +9,6 @@ import com.njcn.gather.event.transientes.pojo.po.PqsUser; import com.njcn.gather.event.transientes.pojo.po.PqsUserSet; import com.njcn.redis.utils.RedisUtil; import lombok.RequiredArgsConstructor; -import netscape.javascript.JSObject; import org.springframework.security.core.userdetails.User; import org.springframework.security.core.userdetails.UserDetails; import org.springframework.security.core.userdetails.UserDetailsService; diff --git a/user/src/main/java/com/njcn/gather/user/user/controller/AuthController.java b/user/src/main/java/com/njcn/gather/user/user/controller/AuthController.java index ce9447cf..24554efa 100644 --- a/user/src/main/java/com/njcn/gather/user/user/controller/AuthController.java +++ b/user/src/main/java/com/njcn/gather/user/user/controller/AuthController.java @@ -70,7 +70,7 @@ public class AuthController extends BaseController { if (ObjectUtil.isNull(user)) { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.FAIL, null, UserValidMessage.LOGIN_FAILED); } else { - String accessToken = JwtUtil.getAccessToken(user.getId()); + String accessToken = JwtUtil.getAccessToken(user.getId(),username); String refreshToken = JwtUtil.getRefreshToken(accessToken); Token token = new Token(); token.setAccessToken(accessToken); @@ -117,7 +117,7 @@ public class AuthController extends BaseController { Map map = JwtUtil.parseToken(accessToken); String userId = (String) map.get(SecurityConstants.USER_ID); SysUser user = sysUserService.getById(userId); - String accessTokenNew = JwtUtil.getAccessToken(userId); + String accessTokenNew = JwtUtil.getAccessToken(userId, user.getLoginName()); request.setAttribute(SecurityConstants.AUTHENTICATE_USERNAME, user.getLoginName()); // String refreshTokenNew = JwtUtil.getRefreshToken(accessTokenNew);