From 769e967815cd4883980f0b9b2d357cb19118398a Mon Sep 17 00:00:00 2001 From: wr <1754607820@qq.com> Date: Wed, 11 Dec 2024 20:39:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A3=80=E6=B5=8B=E6=A8=A1=E5=9D=97=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pojo/param/PreDetectionParam.java | 9 ++ .../gather/detection/pojo/vo/SocketMsg.java | 27 ++++ .../service/impl/PreDetectionServiceImpl.java | 10 ++ .../detection/util/scoket/NettyClient.java | 132 ------------------ .../gather/detection/util/socket/MsgUtil.java | 12 ++ .../SocketManager.java} | 16 ++- .../util/socket/cilent/NettyClient.java | 82 +++++++++++ .../cilent/NettySourceClientHandler.java} | 26 ++-- .../service}/NettyServer.java | 2 +- .../NettyServerChannelInitializer.java | 6 +- .../service}/NettyServerHandler.java | 11 +- .../web}/WebSocketHandler.java | 27 ++-- .../web}/WebSocketInitializer.java | 2 +- .../web}/WebSocketService.java | 12 +- 14 files changed, 197 insertions(+), 177 deletions(-) create mode 100644 detection/src/main/java/com/njcn/gather/detection/pojo/param/PreDetectionParam.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/pojo/vo/SocketMsg.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java delete mode 100644 detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyClient.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java rename detection/src/main/java/com/njcn/gather/detection/util/{webscoket/UserSessionManager.java => socket/SocketManager.java} (54%) create mode 100644 detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java rename detection/src/main/java/com/njcn/gather/detection/util/{scoket/NettyClientHandler.java => socket/cilent/NettySourceClientHandler.java} (75%) rename detection/src/main/java/com/njcn/gather/detection/util/{scoket => socket/service}/NettyServer.java (97%) rename detection/src/main/java/com/njcn/gather/detection/util/{scoket => socket/service}/NettyServerChannelInitializer.java (84%) rename detection/src/main/java/com/njcn/gather/detection/util/{scoket => socket/service}/NettyServerHandler.java (91%) rename detection/src/main/java/com/njcn/gather/detection/util/{webscoket => socket/web}/WebSocketHandler.java (74%) rename detection/src/main/java/com/njcn/gather/detection/util/{webscoket => socket/web}/WebSocketInitializer.java (97%) rename detection/src/main/java/com/njcn/gather/detection/util/{webscoket => socket/web}/WebSocketService.java (82%) diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/param/PreDetectionParam.java b/detection/src/main/java/com/njcn/gather/detection/pojo/param/PreDetectionParam.java new file mode 100644 index 00000000..28109c91 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/param/PreDetectionParam.java @@ -0,0 +1,9 @@ +package com.njcn.gather.detection.pojo.param; + +/** + * @author wr + * @description + * @date 2024/12/11 13:45 + */ +public class PreDetectionParam { +} diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/vo/SocketMsg.java b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/SocketMsg.java new file mode 100644 index 00000000..7cb7cae1 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/SocketMsg.java @@ -0,0 +1,27 @@ +package com.njcn.gather.detection.pojo.vo; + +import lombok.Data; + +/** + * @author wr + * @description + * @date 2024/12/11 15:57 + */ +@Data +public class SocketMsg { + + /** + * 请求id,确保接收到响应时,知晓是针对的哪次请求的应答 + */ + private String requestId; + + /** + * 源初始化 INIT_GATHER$01 INIT_GATHER采集初始化,01 统计采集、02 暂态采集、03 实时采集 + */ + private String operateCode; + + /** + * 数据体,传输前需要将对象、Array等转为String + */ + private String data; +} diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java new file mode 100644 index 00000000..39c8a34f --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -0,0 +1,10 @@ +package com.njcn.gather.detection.service.impl; + +/** + * @author wr + * @description + * @date 2024/12/11 13:38 + */ +public class PreDetectionServiceImpl { + +} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyClient.java deleted file mode 100644 index 21b8b41b..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyClient.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.njcn.gather.detection.util.scoket; - -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoop; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http.DefaultHttpHeaders; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpResponseDecoder; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; -import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; -import io.netty.handler.codec.http.websocketx.WebSocketVersion; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.CharsetUtil; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.TimeUnit; - -/** - * @Description: 心跳检测服务端 对应的服务端在netty-server 包下的NettyClient - * @Author: wr - * @Date: 2024/12/10 14:16 - */ -public class NettyClient { - - private static Bootstrap bootstrap; - - public static void socketClient(String msg) { - try { - NioEventLoopGroup group = new NioEventLoopGroup(); - if (ObjectUtil.isNull(bootstrap)) { - bootstrap = new Bootstrap(); - bootstrap.group(group) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(NioSocketChannel ch) { - ch.pipeline() - //空闲状态的handler -// .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) - .addLast(new StringDecoder(CharsetUtil.UTF_8)) - .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new NettyClientHandler()); - } - }); - } - - //连接 - connect(msg); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * 重连方法 - */ - public static void connect(String msg) { - try { - bootstrap.connect("127.0.0.1", 8787).sync() - .addListener((ChannelFutureListener) ch -> { - if (!ch.isSuccess()) { - ch.channel().close(); - final EventLoop loop = ch.channel().eventLoop(); - loop.schedule(() -> { - System.err.println("服务端链接不上,开始重连操作..."); - //重连 - connect(msg); - }, 3L, TimeUnit.SECONDS); - } else { - if (StrUtil.isNotBlank(msg)) { - ch.channel().writeAndFlush(msg); - } - System.out.println("服务端链接成功..."); - } - }); - - } catch (Exception e) { - System.out.println(e.getMessage()); - try { - Thread.sleep(3000L); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - //再重连 - connect(msg); - } - } - - - /** - * 重连方法 - */ - public static void connect() { - try { - bootstrap.connect("127.0.0.1", 8787).sync() - .addListener((ChannelFutureListener) ch -> { - if (!ch.isSuccess()) { - ch.channel().close(); - final EventLoop loop = ch.channel().eventLoop(); - loop.schedule(() -> { - System.err.println("服务端链接不上,开始重连操作..."); - //重连 - connect(); - }, 3L, TimeUnit.SECONDS); - } else { - ch.channel().writeAndFlush("ping"); - System.out.println("服务端链接成功..."); - } - }); - } catch (Exception e) { - System.out.println(e.getMessage()); - try { - Thread.sleep(3000L); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - //再重连 - connect(); - } - } -} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java new file mode 100644 index 00000000..1170036f --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java @@ -0,0 +1,12 @@ +package com.njcn.gather.detection.util.socket; + +/** + * @author wr + * @description + * @date 2024/12/11 19:27 + */ + +public class MsgUtil { + + +} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/webscoket/UserSessionManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java similarity index 54% rename from detection/src/main/java/com/njcn/gather/detection/util/webscoket/UserSessionManager.java rename to detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java index e9992942..0c092cba 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/webscoket/UserSessionManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java @@ -1,11 +1,17 @@ -package com.njcn.gather.detection.util.webscoket; +package com.njcn.gather.detection.util.socket; import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class UserSessionManager { +/** + * @Description: webSocket存储的通道 + * @Author: wr + * @Date: 2024/12/11 13:04 + */ +public class SocketManager { private static final Map userSessions = new ConcurrentHashMap<>(); public static void addUser(String userId, Channel channel) { @@ -19,5 +25,11 @@ public class UserSessionManager { public static Channel getChannelByUserId(String userId) { return userSessions.get(userId); } + + public static void sendMsg(String msg) { + Channel userId = userSessions.get("userId"); + TextWebSocketFrame wd1 = new TextWebSocketFrame(msg); + userId.writeAndFlush(wd1); + } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java new file mode 100644 index 00000000..b87a635a --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -0,0 +1,82 @@ +package com.njcn.gather.detection.util.socket.cilent; + +import cn.hutool.core.util.StrUtil; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; + +import java.util.concurrent.TimeUnit; + +/** + * @Description: 心跳检测服务端 对应的服务端在netty-server 包下的NettyClient + * @Author: wr + * @Date: 2024/12/10 14:16 + */ +public class NettyClient { + + public static void socketClient(String msg, ChannelHandler harder) { + NioEventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + try { + bootstrap.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel ch) { + ch.pipeline() + //空闲状态的handler +// .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) + .addLast(new StringDecoder(CharsetUtil.UTF_8)) + .addLast(new StringEncoder(CharsetUtil.UTF_8)) + .addLast(new NettySourceClientHandler()); + } + }); + ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8787).sync(); + channelFuture.channel().writeAndFlush("ping"); + channelFuture.channel().closeFuture().sync(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + group.shutdownGracefully(); + } + + } + + /** + * 重连方法 + */ + public static void connect(Bootstrap bootstrap, String msg) { + try { + bootstrap.connect("127.0.0.1", 8787).sync() + .addListener((ChannelFutureListener) ch -> { + if (!ch.isSuccess()) { + ch.channel().close(); + final EventLoop loop = ch.channel().eventLoop(); + loop.schedule(() -> { + System.err.println("服务端链接不上,开始重连操作..."); + //重连 + connect(bootstrap, msg); + }, 3L, TimeUnit.SECONDS); + } else { + if (StrUtil.isNotBlank(msg)) { + ch.channel().writeAndFlush(msg); + } + System.out.println("服务端链接成功..."); + } + }); + } catch (Exception e) { + System.out.println(e.getMessage()); + try { + Thread.sleep(3000L); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + //再重连 + connect(bootstrap, msg); + } + } +} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java similarity index 75% rename from detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyClientHandler.java rename to detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java index b185364f..8e99ed07 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java @@ -1,22 +1,19 @@ -package com.njcn.gather.detection.util.scoket; +package com.njcn.gather.detection.util.socket.cilent; -import com.njcn.gather.detection.util.webscoket.UserSessionManager; +import com.njcn.gather.detection.util.socket.SocketManager; +import com.njcn.gather.detection.util.socket.WebServiceManager; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; import java.util.Scanner; -import java.util.concurrent.TimeUnit; /** - * @Description: 客户端业务处理 + * @Description: 客户端业务处理(示例) * @Author: wr * @Date: 2024/12/10 14:16 */ -public class NettyClientHandler extends SimpleChannelInboundHandler { +public class NettySourceClientHandler extends SimpleChannelInboundHandler { /** * 当通道进行连接时推送消息 @@ -24,19 +21,19 @@ public class NettyClientHandler extends SimpleChannelInboundHandler { */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("通道已建立" + ctx.channel().remoteAddress()); + System.out.println("客户端通道已建立" + ctx.channel().remoteAddress()); } /** * 处理服务端消息消息信息 */ @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) { + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { + Channel userId = SocketManager.getChannelByUserId("userId"); if(msg.endsWith("结束")){ - Channel userId = UserSessionManager.getChannelByUserId("userId"); - TextWebSocketFrame wd1 = new TextWebSocketFrame("流程结束"); - userId.writeAndFlush(wd1); - ctx.close(); + WebServiceManager.sendMsg("第一流程结束"); + System.out.println("第一流程结束"); + userId.close(); }else{ System.out.println("服务端消息 == " + msg); Scanner scanner = new Scanner(System.in); @@ -84,6 +81,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) { + SocketManager.addUser("userId",ctx.channel()); System.out.println("有通道接入" + ctx.channel()); } @Override diff --git a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServer.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServer.java similarity index 97% rename from detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServer.java rename to detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServer.java index 99ed8572..84c052f1 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServer.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServer.java @@ -1,4 +1,4 @@ -package com.njcn.gather.detection.util.scoket; +package com.njcn.gather.detection.util.socket.service; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; diff --git a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServerChannelInitializer.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerChannelInitializer.java similarity index 84% rename from detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServerChannelInitializer.java rename to detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerChannelInitializer.java index 7ef70d17..829e5d04 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServerChannelInitializer.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerChannelInitializer.java @@ -1,16 +1,12 @@ -package com.njcn.gather.detection.util.scoket; +package com.njcn.gather.detection.util.socket.service; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.CharsetUtil; -import java.util.concurrent.TimeUnit; - /** * @Description: 服务端初始化配置 * @Author: wr diff --git a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServerHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerHandler.java similarity index 91% rename from detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServerHandler.java rename to detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerHandler.java index 2a931c06..92a5aca9 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/scoket/NettyServerHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerHandler.java @@ -1,14 +1,9 @@ -package com.njcn.gather.detection.util.scoket; +package com.njcn.gather.detection.util.socket.service; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.ReferenceCountUtil; - -import java.util.Scanner; /** * @Description: 客户端业务处理 @@ -39,9 +34,11 @@ public class NettyServerHandler extends SimpleChannelInboundHandler { System.out.println("socket客户端接收成功:"+msg); if(msg.endsWith("结束")) { channel.writeAndFlush("socket指令结果:"+msg); + for (int i = 0; i < 5; i++) { + channel.writeAndFlush("socket指令结果:"+msg); + } }else{ channel.writeAndFlush("socket指令结果:成功指令"); - } } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/webscoket/WebSocketHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketHandler.java similarity index 74% rename from detection/src/main/java/com/njcn/gather/detection/util/webscoket/WebSocketHandler.java rename to detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketHandler.java index 2c2a3dcb..ed8456ff 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/webscoket/WebSocketHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketHandler.java @@ -1,13 +1,13 @@ -package com.njcn.gather.detection.util.webscoket; +package com.njcn.gather.detection.util.socket.web; -import com.njcn.gather.detection.util.scoket.NettyClient; +import com.njcn.gather.detection.util.socket.WebServiceManager; +import com.njcn.gather.detection.util.socket.cilent.NettyClient; +import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import java.util.Map; /** @@ -21,18 +21,21 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { + if (future.isSuccess()) { + System.out.println("webSocket服务启动成功"); + } else { + System.out.println("webSocket服务启动失败"); + } + }); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace();