diff --git a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java new file mode 100644 index 00000000..30bc6d1b --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java @@ -0,0 +1,35 @@ +package com.njcn.gather.detection.controller; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.gather.detection.service.PreDetectionService; +import com.njcn.web.controller.BaseController; +import com.njcn.web.utils.HttpResultUtil; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.Get; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@Api(tags = "预检测") +@RestController +@RequestMapping("/prepare") +@RequiredArgsConstructor +public class PreDetectionController extends BaseController { + + private final PreDetectionService preDetectionService; + + /** + * + */ + @GetMapping("/test") + public HttpResult test(){ + String methodDescribe = getMethodDescribe("test"); + preDetectionService.test(); + + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } +} diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketResponseService.java new file mode 100644 index 00000000..daf42554 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketResponseService.java @@ -0,0 +1,29 @@ +package com.njcn.gather.detection.handler; + +import com.njcn.gather.detection.util.socket.SocketManager; +import com.njcn.gather.detection.util.socket.web.WebSocketHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +@RequiredArgsConstructor +public class SocketResponseService { + + private final WebSocketHandler webSocketHandler; + + public void deal(String userId,String msg){ + + + System.out.println("进入deal+++++++++++++++++++"); + webSocketHandler.senMsgToUser(userId,msg); + + SocketManager.getChannelByUserId(userId).close(); + + } + + + + +} diff --git a/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java b/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java index d7c6dadc..cf84ab1d 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java @@ -28,5 +28,8 @@ public interface PreDetectionService { void phaseSequenceCheck(); + void test(); + + } diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 39c8a34f..9c55222f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -1,10 +1,54 @@ package com.njcn.gather.detection.service.impl; -/** - * @author wr - * @description - * @date 2024/12/11 13:38 - */ -public class PreDetectionServiceImpl { +import com.njcn.gather.detection.handler.SocketResponseService; +import com.njcn.gather.detection.service.PreDetectionService; + +import com.njcn.gather.detection.util.socket.cilent.NettyClient; +import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class PreDetectionServiceImpl implements PreDetectionService { + + + @Value("${socket.source.ip}") + private static String ip; + + @Value("${socket.source.port}") + private static Integer port; + + private final SocketResponseService socketResponseService; + + + @Override + public void sourceCommunicationCheck() { + + } + + @Override + public void deviceCommunicationCheck() { + + } + + @Override + public void agreementCheck() { + + } + + @Override + public void phaseSequenceCheck() { + + } + + @Override + public void test() { + + + NettyClient.socketClient(ip,port,"test------------>",new NettySourceClientHandler("aa",socketResponseService)); + + } } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java index 100cd49f..3c39026a 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java @@ -12,13 +12,27 @@ import java.util.concurrent.ConcurrentHashMap; * @Date: 2024/12/11 13:04 */ public class WebServiceManager { + + //key:页面 value:channel private static final Map userSessions = new ConcurrentHashMap<>(); + //key:channelId value:页面 + private static final Map pageSessions = new ConcurrentHashMap<>(); + public static void addUser(String userId, Channel channel) { userSessions.put(userId, channel); + pageSessions.put(channel.id().toString(),userId); } public static void removeUser(String userId) { + String id = userSessions.get(userId).id().toString(); + userSessions.remove(userId); + pageSessions.remove(id); + } + + public static void removeChannel(String channelId) { + String userId = pageSessions.get(channelId); + System.out.println("userId"+userId); userSessions.remove(userId); } @@ -26,10 +40,10 @@ public class WebServiceManager { return userSessions.get(userId); } - public static void sendMsg(String msg) { - Channel userId = userSessions.get("userId"); + public static void sendMsg(String userId,String msg) { + Channel channel = userSessions.get(userId); TextWebSocketFrame wd1 = new TextWebSocketFrame(msg); - userId.writeAndFlush(wd1); + channel.writeAndFlush(wd1); } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index b87a635a..02637eb8 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -8,6 +8,8 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; +import io.swagger.models.auth.In; +import org.springframework.beans.factory.annotation.Value; import java.util.concurrent.TimeUnit; @@ -18,7 +20,8 @@ import java.util.concurrent.TimeUnit; */ public class NettyClient { - public static void socketClient(String msg, ChannelHandler harder) { + + public static void socketClient(String ip, Integer port, String msg, ChannelHandler handler) { NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { @@ -32,16 +35,17 @@ public class NettyClient { // .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new NettySourceClientHandler()); + .addLast(handler); } }); - ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8787).sync(); - channelFuture.channel().writeAndFlush("ping"); + ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); + channelFuture.channel().writeAndFlush(msg); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); + System.out.println("程序结束"); } } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java index 8e99ed07..92ec85df 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java @@ -1,10 +1,13 @@ package com.njcn.gather.detection.util.socket.cilent; +import com.njcn.gather.detection.handler.SocketResponseService; import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.WebServiceManager; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; import java.util.Scanner; @@ -13,8 +16,22 @@ import java.util.Scanner; * @Author: wr * @Date: 2024/12/10 14:16 */ + +@RequiredArgsConstructor public class NettySourceClientHandler extends SimpleChannelInboundHandler { + private final String webUser; + + private final SocketResponseService socketResponseService; + + + + + + + + + /** * 当通道进行连接时推送消息 * @param ctx @@ -31,15 +48,13 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler { @@ -24,41 +31,82 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { + + private final WebSocketHandler webSocketHandler; + + + @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); @@ -33,8 +44,18 @@ public class WebSocketInitializer extends ChannelInitializer { * 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了 * 这个handler是将http协议升级为websocket 并且使用 101 作为响应码 * */ + pipeline.addLast(webSocketHandler); pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); - pipeline.addLast(new WebSocketHandler()); + + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 处理异常,例如记录日志、关闭连接等 + System.out.println("进入异常++++++++++++++++++"); + cause.printStackTrace(); + ctx.close(); + } + }); } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java index d1e621ae..c16b2226 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java @@ -8,6 +8,12 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; /** @@ -15,10 +21,83 @@ import io.netty.handler.logging.LoggingHandler; * @Author: wr * @Date: 2024/12/10 13:59 */ +@Component +@RequiredArgsConstructor public class WebSocketService { + private final WebSocketInitializer webSocketInitializer; - public static void main(String[] args) { + /** + * 端口号 + */ + @Value("${webSocket.netty.port:7777}") + int port; + + + EventLoopGroup bossGroup; + EventLoopGroup workerGroup; + + + + @PostConstruct + public void start() throws InterruptedException { + new Thread(() -> { + //可以自定义线程的数量 + bossGroup = new NioEventLoopGroup(); + // 默认创建的线程数量 = CPU 处理器数量 *2 + workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler()) + //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度 + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + + //设置连接保持为活动状态 + .childOption(ChannelOption.SO_KEEPALIVE, true) + + .childHandler(webSocketInitializer); + + ChannelFuture future = serverBootstrap.bind(port).sync(); + future.addListener(f -> { + if (future.isSuccess()) { + System.out.println("webSocket服务启动成功"); + } else { + System.out.println("webSocket服务启动失败"); + } + }); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } + + + /** + * 释放资源 + */ + @PreDestroy + public void destroy() throws InterruptedException { + if (bossGroup != null) { + bossGroup.shutdownGracefully().sync(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully().sync(); + } + + System.out.println("webSocket销毁---------------"); + } + + + + + /* public static void main(String[] args) { //可以自定义线程的数量 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 默认创建的线程数量 = CPU 处理器数量 *2 @@ -49,7 +128,7 @@ public class WebSocketService { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } - } + }*/ } diff --git a/entrance/src/main/resources/application.yml b/entrance/src/main/resources/application.yml index e62fef4a..5131da0f 100644 --- a/entrance/src/main/resources/application.yml +++ b/entrance/src/main/resources/application.yml @@ -41,6 +41,20 @@ mybatis-plus: #指定主键生成策略 id-type: assign_uuid + + +socket: + source: + ip: 127.0.0.1 + port: 7777 + device: + ip: 127.0.0.1 + port: 7777 + +webSocket: + port: 7777 + + log: homeDir: D:\logs commonLevel: info