检测模块调整

This commit is contained in:
wr
2024-12-11 20:39:17 +08:00
parent 379e4a33ff
commit 769e967815
14 changed files with 197 additions and 177 deletions

View File

@@ -0,0 +1,9 @@
package com.njcn.gather.detection.pojo.param;
/**
* @author wr
* @description
* @date 2024/12/11 13:45
*/
public class PreDetectionParam {
}

View File

@@ -0,0 +1,27 @@
package com.njcn.gather.detection.pojo.vo;
import lombok.Data;
/**
* @author wr
* @description
* @date 2024/12/11 15:57
*/
@Data
public class SocketMsg {
/**
* 请求id确保接收到响应时知晓是针对的哪次请求的应答
*/
private String requestId;
/**
* 源初始化 INIT_GATHER$01 INIT_GATHER采集初始化01 统计采集、02 暂态采集、03 实时采集
*/
private String operateCode;
/**
* 数据体传输前需要将对象、Array等转为String
*/
private String data;
}

View File

@@ -0,0 +1,10 @@
package com.njcn.gather.detection.service.impl;
/**
* @author wr
* @description
* @date 2024/12/11 13:38
*/
public class PreDetectionServiceImpl {
}

View File

@@ -1,132 +0,0 @@
package com.njcn.gather.detection.util.scoket;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
/**
* @Description: 心跳检测服务端 对应的服务端在netty-server 包下的NettyClient
* @Author: wr
* @Date: 2024/12/10 14:16
*/
public class NettyClient {
private static Bootstrap bootstrap;
public static void socketClient(String msg) {
try {
NioEventLoopGroup group = new NioEventLoopGroup();
if (ObjectUtil.isNull(bootstrap)) {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
//空闲状态的handler
// .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new NettyClientHandler());
}
});
}
//连接
connect(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 重连方法
*/
public static void connect(String msg) {
try {
bootstrap.connect("127.0.0.1", 8787).sync()
.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
ch.channel().close();
final EventLoop loop = ch.channel().eventLoop();
loop.schedule(() -> {
System.err.println("服务端链接不上,开始重连操作...");
//重连
connect(msg);
}, 3L, TimeUnit.SECONDS);
} else {
if (StrUtil.isNotBlank(msg)) {
ch.channel().writeAndFlush(msg);
}
System.out.println("服务端链接成功...");
}
});
} catch (Exception e) {
System.out.println(e.getMessage());
try {
Thread.sleep(3000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//再重连
connect(msg);
}
}
/**
* 重连方法
*/
public static void connect() {
try {
bootstrap.connect("127.0.0.1", 8787).sync()
.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
ch.channel().close();
final EventLoop loop = ch.channel().eventLoop();
loop.schedule(() -> {
System.err.println("服务端链接不上,开始重连操作...");
//重连
connect();
}, 3L, TimeUnit.SECONDS);
} else {
ch.channel().writeAndFlush("ping");
System.out.println("服务端链接成功...");
}
});
} catch (Exception e) {
System.out.println(e.getMessage());
try {
Thread.sleep(3000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//再重连
connect();
}
}
}

View File

@@ -0,0 +1,12 @@
package com.njcn.gather.detection.util.socket;
/**
* @author wr
* @description
* @date 2024/12/11 19:27
*/
public class MsgUtil {
}

View File

@@ -1,11 +1,17 @@
package com.njcn.gather.detection.util.webscoket; package com.njcn.gather.detection.util.socket;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class UserSessionManager { /**
* @Description: webSocket存储的通道
* @Author: wr
* @Date: 2024/12/11 13:04
*/
public class SocketManager {
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>(); private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
public static void addUser(String userId, Channel channel) { public static void addUser(String userId, Channel channel) {
@@ -19,5 +25,11 @@ public class UserSessionManager {
public static Channel getChannelByUserId(String userId) { public static Channel getChannelByUserId(String userId) {
return userSessions.get(userId); return userSessions.get(userId);
} }
public static void sendMsg(String msg) {
Channel userId = userSessions.get("userId");
TextWebSocketFrame wd1 = new TextWebSocketFrame(msg);
userId.writeAndFlush(wd1);
}
} }

View File

@@ -0,0 +1,82 @@
package com.njcn.gather.detection.util.socket.cilent;
import cn.hutool.core.util.StrUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* @Description: 心跳检测服务端 对应的服务端在netty-server 包下的NettyClient
* @Author: wr
* @Date: 2024/12/10 14:16
*/
public class NettyClient {
public static void socketClient(String msg, ChannelHandler harder) {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
//空闲状态的handler
// .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new NettySourceClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8787).sync();
channelFuture.channel().writeAndFlush("ping");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
/**
* 重连方法
*/
public static void connect(Bootstrap bootstrap, String msg) {
try {
bootstrap.connect("127.0.0.1", 8787).sync()
.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
ch.channel().close();
final EventLoop loop = ch.channel().eventLoop();
loop.schedule(() -> {
System.err.println("服务端链接不上,开始重连操作...");
//重连
connect(bootstrap, msg);
}, 3L, TimeUnit.SECONDS);
} else {
if (StrUtil.isNotBlank(msg)) {
ch.channel().writeAndFlush(msg);
}
System.out.println("服务端链接成功...");
}
});
} catch (Exception e) {
System.out.println(e.getMessage());
try {
Thread.sleep(3000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//再重连
connect(bootstrap, msg);
}
}
}

View File

@@ -1,22 +1,19 @@
package com.njcn.gather.detection.util.scoket; package com.njcn.gather.detection.util.socket.cilent;
import com.njcn.gather.detection.util.webscoket.UserSessionManager; import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.WebServiceManager;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Scanner; import java.util.Scanner;
import java.util.concurrent.TimeUnit;
/** /**
* @Description: 客户端业务处理 * @Description: 客户端业务处理示例
* @Author: wr * @Author: wr
* @Date: 2024/12/10 14:16 * @Date: 2024/12/10 14:16
*/ */
public class NettyClientHandler extends SimpleChannelInboundHandler<String> { public class NettySourceClientHandler extends SimpleChannelInboundHandler<String> {
/** /**
* 当通道进行连接时推送消息 * 当通道进行连接时推送消息
@@ -24,19 +21,19 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
*/ */
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道已建立" + ctx.channel().remoteAddress()); System.out.println("客户端通道已建立" + ctx.channel().remoteAddress());
} }
/** /**
* 处理服务端消息消息信息 * 处理服务端消息消息信息
*/ */
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) { protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
Channel userId = SocketManager.getChannelByUserId("userId");
if(msg.endsWith("结束")){ if(msg.endsWith("结束")){
Channel userId = UserSessionManager.getChannelByUserId("userId"); WebServiceManager.sendMsg("第一流程结束");
TextWebSocketFrame wd1 = new TextWebSocketFrame("流程结束"); System.out.println("第一流程结束");
userId.writeAndFlush(wd1); userId.close();
ctx.close();
}else{ }else{
System.out.println("服务端消息 == " + msg); System.out.println("服务端消息 == " + msg);
Scanner scanner = new Scanner(System.in); Scanner scanner = new Scanner(System.in);
@@ -84,6 +81,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
SocketManager.addUser("userId",ctx.channel());
System.out.println("有通道接入" + ctx.channel()); System.out.println("有通道接入" + ctx.channel());
} }
@Override @Override

View File

@@ -1,4 +1,4 @@
package com.njcn.gather.detection.util.scoket; package com.njcn.gather.detection.util.socket.service;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;

View File

@@ -1,16 +1,12 @@
package com.njcn.gather.detection.util.scoket; package com.njcn.gather.detection.util.socket.service;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/** /**
* @Description: 服务端初始化配置 * @Description: 服务端初始化配置
* @Author: wr * @Author: wr

View File

@@ -1,14 +1,9 @@
package com.njcn.gather.detection.util.scoket; package com.njcn.gather.detection.util.socket.service;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import java.util.Scanner;
/** /**
* @Description: 客户端业务处理 * @Description: 客户端业务处理
@@ -39,9 +34,11 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
System.out.println("socket客户端接收成功"+msg); System.out.println("socket客户端接收成功"+msg);
if(msg.endsWith("结束")) { if(msg.endsWith("结束")) {
channel.writeAndFlush("socket指令结果"+msg); channel.writeAndFlush("socket指令结果"+msg);
for (int i = 0; i < 5; i++) {
channel.writeAndFlush("socket指令结果"+msg);
}
}else{ }else{
channel.writeAndFlush("socket指令结果成功指令"); channel.writeAndFlush("socket指令结果成功指令");
} }
} }

View File

@@ -1,13 +1,13 @@
package com.njcn.gather.detection.util.webscoket; package com.njcn.gather.detection.util.socket.web;
import com.njcn.gather.detection.util.scoket.NettyClient; import com.njcn.gather.detection.util.socket.WebServiceManager;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.Map;
/** /**
@@ -21,18 +21,21 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("成功连接"); System.out.println("服务端通道已建立" + ctx.channel().remoteAddress());
ctx.writeAndFlush("22222222222222222222");
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
System.out.println("服务端消息 == " + msg.text()); System.out.println("服务端消息 == " + msg.text());
// Channel userId = UserSessionManager.getChannelByUserId("userId");
// TextWebSocketFrame wd1 = new TextWebSocketFrame("------------------cs");
// userId.writeAndFlush(wd1);
if(msg.text().equals("下发指令")){ if(msg.text().equals("下发指令")){
NettyClient.socketClient(msg.text()); /**
* 处理对应消息
* 1.先下发所要操作的流程信息
* 2.组装对应的入参信息
* 3.再用socket信息返回结束
*/
NettyClient.socketClient(msg.text(),new NettySourceClientHandler());
} }
//可以直接调用text 拿到文本信息帧中的信息 //可以直接调用text 拿到文本信息帧中的信息
@@ -44,7 +47,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
String userId = "userId"; String userId = "userId";
UserSessionManager.addUser(userId, ctx.channel()); WebServiceManager.addUser(userId, ctx.channel());
System.out.println("有新的连接接入:"+ctx); System.out.println("有新的连接接入:"+ctx);
} }
@@ -52,7 +55,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 假设用户 ID 是从某个地方获取的这里简单示例为 "userId" // 假设用户 ID 是从某个地方获取的这里简单示例为 "userId"
String userId = "userId"; String userId = "userId";
UserSessionManager.removeUser(userId); WebServiceManager.removeUser(userId);
System.out.println("有连接退出: " + ctx); System.out.println("有连接退出: " + ctx);
} }

View File

@@ -1,4 +1,4 @@
package com.njcn.gather.detection.util.webscoket; package com.njcn.gather.detection.util.socket.web;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;

View File

@@ -1,4 +1,4 @@
package com.njcn.gather.detection.util.webscoket; package com.njcn.gather.detection.util.socket.web;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
@@ -23,7 +23,7 @@ public class WebSocketService {
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup();
// 默认创建的线程数量 = CPU 处理器数量 *2 // 默认创建的线程数量 = CPU 处理器数量 *2
EventLoopGroup workerGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
@@ -34,8 +34,14 @@ public class WebSocketService {
.childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitializer()); .childHandler(new WebSocketInitializer());
try {
ChannelFuture future = serverBootstrap.bind(7777).sync(); ChannelFuture future = serverBootstrap.bind(7777).sync();
future.addListener(f -> {
if (future.isSuccess()) {
System.out.println("webSocket服务启动成功");
} else {
System.out.println("webSocket服务启动失败");
}
});
future.channel().closeFuture().sync(); future.channel().closeFuture().sync();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();