测试提交

This commit is contained in:
2024-12-12 18:06:27 +08:00
parent e48efef8d6
commit e432501e99
11 changed files with 350 additions and 35 deletions

View File

@@ -0,0 +1,35 @@
package com.njcn.gather.detection.controller;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.gather.detection.service.PreDetectionService;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.Get;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@Api(tags = "预检测")
@RestController
@RequestMapping("/prepare")
@RequiredArgsConstructor
public class PreDetectionController extends BaseController {
private final PreDetectionService preDetectionService;
/**
*
*/
@GetMapping("/test")
public HttpResult<String> test(){
String methodDescribe = getMethodDescribe("test");
preDetectionService.test();
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,29 @@
package com.njcn.gather.detection.handler;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.web.WebSocketHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@RequiredArgsConstructor
public class SocketResponseService {
private final WebSocketHandler webSocketHandler;
public void deal(String userId,String msg){
System.out.println("进入deal+++++++++++++++++++");
webSocketHandler.senMsgToUser(userId,msg);
SocketManager.getChannelByUserId(userId).close();
}
}

View File

@@ -28,5 +28,8 @@ public interface PreDetectionService {
void phaseSequenceCheck(); void phaseSequenceCheck();
void test();
} }

View File

@@ -1,10 +1,54 @@
package com.njcn.gather.detection.service.impl; package com.njcn.gather.detection.service.impl;
/**
* @author wr
* @description
* @date 2024/12/11 13:38
*/
public class PreDetectionServiceImpl {
import com.njcn.gather.detection.handler.SocketResponseService;
import com.njcn.gather.detection.service.PreDetectionService;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class PreDetectionServiceImpl implements PreDetectionService {
@Value("${socket.source.ip}")
private static String ip;
@Value("${socket.source.port}")
private static Integer port;
private final SocketResponseService socketResponseService;
@Override
public void sourceCommunicationCheck() {
}
@Override
public void deviceCommunicationCheck() {
}
@Override
public void agreementCheck() {
}
@Override
public void phaseSequenceCheck() {
}
@Override
public void test() {
NettyClient.socketClient(ip,port,"test------------>",new NettySourceClientHandler("aa",socketResponseService));
}
} }

View File

@@ -12,13 +12,27 @@ import java.util.concurrent.ConcurrentHashMap;
* @Date: 2024/12/11 13:04 * @Date: 2024/12/11 13:04
*/ */
public class WebServiceManager { public class WebServiceManager {
//key:页面 value:channel
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>(); private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
//key:channelId value:页面
private static final Map<String, String> pageSessions = new ConcurrentHashMap<>();
public static void addUser(String userId, Channel channel) { public static void addUser(String userId, Channel channel) {
userSessions.put(userId, channel); userSessions.put(userId, channel);
pageSessions.put(channel.id().toString(),userId);
} }
public static void removeUser(String userId) { public static void removeUser(String userId) {
String id = userSessions.get(userId).id().toString();
userSessions.remove(userId);
pageSessions.remove(id);
}
public static void removeChannel(String channelId) {
String userId = pageSessions.get(channelId);
System.out.println("userId"+userId);
userSessions.remove(userId); userSessions.remove(userId);
} }
@@ -26,10 +40,10 @@ public class WebServiceManager {
return userSessions.get(userId); return userSessions.get(userId);
} }
public static void sendMsg(String msg) { public static void sendMsg(String userId,String msg) {
Channel userId = userSessions.get("userId"); Channel channel = userSessions.get(userId);
TextWebSocketFrame wd1 = new TextWebSocketFrame(msg); TextWebSocketFrame wd1 = new TextWebSocketFrame(msg);
userId.writeAndFlush(wd1); channel.writeAndFlush(wd1);
} }
} }

View File

@@ -8,6 +8,8 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.swagger.models.auth.In;
import org.springframework.beans.factory.annotation.Value;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -18,7 +20,8 @@ import java.util.concurrent.TimeUnit;
*/ */
public class NettyClient { public class NettyClient {
public static void socketClient(String msg, ChannelHandler harder) {
public static void socketClient(String ip, Integer port, String msg, ChannelHandler handler) {
NioEventLoopGroup group = new NioEventLoopGroup(); NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
try { try {
@@ -32,16 +35,17 @@ public class NettyClient {
// .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) // .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new NettySourceClientHandler()); .addLast(handler);
} }
}); });
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8787).sync(); ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
channelFuture.channel().writeAndFlush("ping"); channelFuture.channel().writeAndFlush(msg);
channelFuture.channel().closeFuture().sync(); channelFuture.channel().closeFuture().sync();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
group.shutdownGracefully(); group.shutdownGracefully();
System.out.println("程序结束");
} }
} }

View File

@@ -1,10 +1,13 @@
package com.njcn.gather.detection.util.socket.cilent; package com.njcn.gather.detection.util.socket.cilent;
import com.njcn.gather.detection.handler.SocketResponseService;
import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.WebServiceManager; import com.njcn.gather.detection.util.socket.WebServiceManager;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Scanner; import java.util.Scanner;
@@ -13,8 +16,22 @@ import java.util.Scanner;
* @Author: wr * @Author: wr
* @Date: 2024/12/10 14:16 * @Date: 2024/12/10 14:16
*/ */
@RequiredArgsConstructor
public class NettySourceClientHandler extends SimpleChannelInboundHandler<String> { public class NettySourceClientHandler extends SimpleChannelInboundHandler<String> {
private final String webUser;
private final SocketResponseService socketResponseService;
/** /**
* 当通道进行连接时推送消息 * 当通道进行连接时推送消息
* @param ctx * @param ctx
@@ -31,15 +48,13 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {
Channel userId = SocketManager.getChannelByUserId("userId"); Channel userId = SocketManager.getChannelByUserId("userId");
if(msg.endsWith("结束")){ if(msg.endsWith("结束")){
WebServiceManager.sendMsg("第一流程结束"); WebServiceManager.sendMsg("","第一流程结束");
System.out.println("第一流程结束"); System.out.println("第一流程结束");
userId.close(); userId.close();
}else{ }else{
System.out.println("服务端消息 == " + msg); System.out.println("服务端消息 == " + msg);
Scanner scanner = new Scanner(System.in);
System.out.print("一个信息返回客户端:"); socketResponseService.deal(webUser,msg);
String text = scanner.nextLine();
ctx.writeAndFlush(text);
} }
} }
@@ -81,9 +96,11 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
SocketManager.addUser("userId",ctx.channel()); SocketManager.addUser(webUser,ctx.channel());
System.out.println("有通道接入" + ctx.channel()); System.out.println("有通道接入" + ctx.channel());
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace(); cause.printStackTrace();

View File

@@ -4,10 +4,13 @@ import com.njcn.gather.detection.util.socket.WebServiceManager;
import com.njcn.gather.detection.util.socket.cilent.NettyClient; import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/** /**
@@ -16,6 +19,10 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
* @Author: wr * @Author: wr
* @Date: 2024/12/10 13:56 * @Date: 2024/12/10 13:56
*/ */
@Component
@ChannelHandler.Sharable
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@@ -24,41 +31,82 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
System.out.println("服务端通道已建立" + ctx.channel().remoteAddress()); System.out.println("服务端通道已建立" + ctx.channel().remoteAddress());
} }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次连接是FullHttpRequest把用户id和对应的channel对象存储起来
if (null != msg && msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
String userId = getUrlParams(uri);
WebServiceManager.addUser(userId, ctx.channel());
log.info("登录的用户id是{}", userId);
//如果url包含参数需要处理
if (uri.contains("?")) {
String newUri = uri.substring(0, uri.indexOf("?"));
request.setUri(newUri);
}
} else if (msg instanceof TextWebSocketFrame) {
//正常的TEXT消息类型
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
log.info("客户端收到服务器数据:{}", frame.text());
}
super.channelRead(ctx, msg);
}
private static String getUrlParams(String url) {
if (!url.contains("=")) {
return null;
}
String userId = url.substring(url.indexOf("=") + 1);
return userId;
}
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
System.out.println("服务端消息 == " + msg.text()); /* System.out.println("服务端消息 == " + msg.text());
if(msg.text().equals("下发指令")){ if(msg.text().equals("下发指令")){
/** *//**
* 处理对应消息 * 处理对应消息
* 1.先下发所要操作的流程信息 * 1.先下发所要操作的流程信息
* 2.组装对应的入参信息 * 2.组装对应的入参信息
* 3.再用socket信息返回结束 * 3.再用socket信息返回结束
*/ *//*
NettyClient.socketClient(msg.text(),new NettySourceClientHandler()); //NettyClient.socketClient(msg.text(),new NettySourceClientHandler());
} }*/
//可以直接调用text 拿到文本信息帧中的信息 //可以直接调用text 拿到文本信息帧中的信息
Channel channel = ctx.channel(); /* Channel channel = ctx.channel();
TextWebSocketFrame resp = new TextWebSocketFrame(msg.text()); TextWebSocketFrame resp = new TextWebSocketFrame(msg.text());
channel.writeAndFlush(resp); channel.writeAndFlush(resp);*/
} }
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
String userId = "userId"; //WebServiceManager.addUser(userId, ctx.channel());
WebServiceManager.addUser(userId, ctx.channel());
System.out.println("有新的连接接入:"+ctx); System.out.println("有新的连接接入:"+ctx);
} }
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 假设用户 ID 是从某个地方获取的,这里简单示例为 "userId" // 假设用户 ID 是从某个地方获取的,这里简单示例为 "userId"
String userId = "userId"; System.out.println("weoSocket退出: " + ctx);
WebServiceManager.removeUser(userId); WebServiceManager.removeChannel(ctx.channel().id().toString());
System.out.println("有连接退出: " + ctx);
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
//空闲状态的事件 //空闲状态的事件
@@ -73,5 +121,12 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
// } // }
// } // }
} }
/**
* 推送数据至前台
*/
public void senMsgToUser(String userId,String msg){
WebServiceManager.sendMsg(userId,msg);
}
} }

View File

@@ -1,5 +1,7 @@
package com.njcn.gather.detection.util.socket.web; package com.njcn.gather.detection.util.socket.web;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
@@ -8,13 +10,22 @@ import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/** /**
* @Description: webSocket服务端自定义配置 * @Description: webSocket服务端自定义配置
* @Author: wr * @Author: wr
* @Date: 2024/12/10 14:20 * @Date: 2024/12/10 14:20
*/ */
@Component
@RequiredArgsConstructor
public class WebSocketInitializer extends ChannelInitializer<SocketChannel> { public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
private final WebSocketHandler webSocketHandler;
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
@@ -33,8 +44,18 @@ public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
* 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了 * 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
* 这个handler是将http协议升级为websocket 并且使用 101 作为响应码 * 这个handler是将http协议升级为websocket 并且使用 101 作为响应码
* */ * */
pipeline.addLast(webSocketHandler);
pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new WebSocketHandler());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 处理异常,例如记录日志、关闭连接等
System.out.println("进入异常++++++++++++++++++");
cause.printStackTrace();
ctx.close();
}
});
} }

View File

@@ -8,6 +8,12 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/** /**
@@ -15,10 +21,83 @@ import io.netty.handler.logging.LoggingHandler;
* @Author: wr * @Author: wr
* @Date: 2024/12/10 13:59 * @Date: 2024/12/10 13:59
*/ */
@Component
@RequiredArgsConstructor
public class WebSocketService { public class WebSocketService {
private final WebSocketInitializer webSocketInitializer;
public static void main(String[] args) { /**
* 端口号
*/
@Value("${webSocket.netty.port:7777}")
int port;
EventLoopGroup bossGroup;
EventLoopGroup workerGroup;
@PostConstruct
public void start() throws InterruptedException {
new Thread(() -> {
//可以自定义线程的数量
bossGroup = new NioEventLoopGroup();
// 默认创建的线程数量 = CPU 处理器数量 *2
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
//当前连接被阻塞的时候BACKLOG代表的事 阻塞队列的长度
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
//设置连接保持为活动状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(webSocketInitializer);
ChannelFuture future = serverBootstrap.bind(port).sync();
future.addListener(f -> {
if (future.isSuccess()) {
System.out.println("webSocket服务启动成功");
} else {
System.out.println("webSocket服务启动失败");
}
});
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}).start();
}
/**
* 释放资源
*/
@PreDestroy
public void destroy() throws InterruptedException {
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully().sync();
}
System.out.println("webSocket销毁---------------");
}
/* public static void main(String[] args) {
//可以自定义线程的数量 //可以自定义线程的数量
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup();
// 默认创建的线程数量 = CPU 处理器数量 *2 // 默认创建的线程数量 = CPU 处理器数量 *2
@@ -49,7 +128,7 @@ public class WebSocketService {
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
} }
} }*/
} }

View File

@@ -41,6 +41,20 @@ mybatis-plus:
#指定主键生成策略 #指定主键生成策略
id-type: assign_uuid id-type: assign_uuid
socket:
source:
ip: 127.0.0.1
port: 7777
device:
ip: 127.0.0.1
port: 7777
webSocket:
port: 7777
log: log:
homeDir: D:\logs homeDir: D:\logs
commonLevel: info commonLevel: info