diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java new file mode 100644 index 00000000..38327c5f --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java @@ -0,0 +1,14 @@ +package com.njcn.gather.detection.pojo.vo; + +import lombok.Data; +import org.apache.poi.ss.formula.functions.T; + +@Data +public class WebSocketVO { + + private String code; + + private String message; + + private T data; +} diff --git a/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java b/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java index 6150d5be..081eb5a2 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java @@ -37,7 +37,7 @@ public interface PreDetectionService { void phaseSequenceCheck(); - void startTest(PreDetectionParam param); + boolean startTest(PreDetectionParam param); diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index b209daa5..f9cf60de 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -8,6 +8,7 @@ import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.service.PreDetectionService; +import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.cilent.NettyClient; import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import com.njcn.gather.device.device.pojo.vo.PreDetection; @@ -23,14 +24,13 @@ import com.njcn.gather.plan.service.IAdPlanService; import com.njcn.gather.plan.service.IAdPlanSourceService; import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum; import com.njcn.gather.system.dictionary.service.IDictDataService; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @Service @RequiredArgsConstructor @@ -138,7 +138,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { } @Override - public void startTest(PreDetectionParam param) { + public boolean startTest(PreDetectionParam param) { List pqDevList = iPqDevService.getDevInfo(Arrays.asList("578c142b7e4e4978a35bd6225aa62a23", "393504f55f1f79bce255bfc195cfdb56")); System.out.println(pqDevList); //校验 @@ -153,7 +153,32 @@ public class PreDetectionServiceImpl implements PreDetectionService { socketMsg.setData(jsonString); String json = JSON.toJSONString(socketMsg); - NettyClient.socketClient(ip, port, "{\"data\":\"{\\\"deviceList\\\":[{\\\"devIP\\\":\\\"192.168.1.186\\\",\\\"port\\\":102,\\\"devType\\\":\\\"PQS882B\\\",\\\"icdType\\\":\\\"PQS882_VX_ZJ_1(V102)\\\",\\\"devCode\\\":\\\"Pqs\\u0026cn870299\\\",\\\"devKey\\\":\\\"!qaz@wsx3edc4rfv\\\",\\\"monitorList\\\":[{\\\"lineId\\\":\\\"1_192.168.1.186_102_1\\\",\\\"line\\\":1}]}]}\",\"operateCode\":\"INIT_GATHER$03\",\"requestId\":\"dansldquiwdlandalksn\"}", new NettySourceClientHandler(param.getUserPageId(), sourceResponseService)); + String tem = "{\"data\":\"{\\\"deviceList\\\":[{\\\"devIP\\\":\\\"192.168.1.186\\\",\\\"port\\\":102,\\\"devType\\\":\\\"PQS882B\\\",\\\"icdType\\\":\\\"PQS882_VX_ZJ_1(V102)\\\",\\\"devCode\\\":\\\"Pqs\\u0026cn870299\\\",\\\"devKey\\\":\\\"!qaz@wsx3edc4rfv\\\",\\\"monitorList\\\":[{\\\"lineId\\\":\\\"1_192.168.1.186_102_1\\\",\\\"line\\\":1}]}]}\",\"operateCode\":\"INIT_GATHER$03\",\"requestId\":\"dansldquiwdlandalksn\"}"; + + + Runnable runnable = new Runnable() { + + @Override + public void run() { + Channel channel = null; + if(SocketManager.getChannelByUserId(param.getUserPageId()) == null || !SocketManager.getChannelByUserId(param.getUserPageId()).isActive()){ + channel = NettyClient.socketClient(ip, port,param.getUserPageId(),new NettySourceClientHandler(param.getUserPageId(), sourceResponseService)); + } + if(Objects.nonNull(channel)){ + try { + channel.writeAndFlush(tem).sync(); + } catch (InterruptedException e) { + System.out.println("发送异常====="); + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + }; + runnable.run(); + + + return true; } } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java index 0c092cba..c23fbac9 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java @@ -26,10 +26,10 @@ public class SocketManager { return userSessions.get(userId); } - public static void sendMsg(String msg) { - Channel userId = userSessions.get("userId"); + public static void sendMsg(String userId,String msg) { + Channel channel = userSessions.get(userId); TextWebSocketFrame wd1 = new TextWebSocketFrame(msg); - userId.writeAndFlush(wd1); + channel.writeAndFlush(wd1); } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java index 6dab056f..a99bb897 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java @@ -3,6 +3,7 @@ package com.njcn.gather.detection.util.socket; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -16,24 +17,26 @@ public class WebServiceManager { //key:页面 value:channel private static final Map userSessions = new ConcurrentHashMap<>(); - //key:channelId value:页面 - private static final Map pageSessions = new ConcurrentHashMap<>(); + public static void addUser(String userId, Channel channel) { userSessions.put(userId, channel); - pageSessions.put(channel.id().toString(),userId); } public static void removeUser(String userId) { String id = userSessions.get(userId).id().toString(); userSessions.remove(userId); - pageSessions.remove(id); } public static void removeChannel(String channelId) { - String userId = pageSessions.get(channelId); - System.out.println("userId"+userId); - userSessions.remove(userId); + // 遍历并删除 + Iterator> iterator = userSessions.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().id().equals(channelId)) { + iterator.remove(); + } + } } public static Channel getChannelByUserId(String userId) { @@ -42,6 +45,7 @@ public class WebServiceManager { public static void sendMsg(String userId,String msg) { Channel channel = userSessions.get(userId); + TextWebSocketFrame wd = new TextWebSocketFrame(msg); channel.writeAndFlush(wd); } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index 10ecdbfc..62365170 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -1,14 +1,24 @@ package com.njcn.gather.detection.util.socket.cilent; import cn.hutool.core.util.StrUtil; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.gather.detection.util.socket.SocketManager; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.timeout.TimeoutException; import io.netty.util.CharsetUtil; +import java.io.IOException; +import java.net.ConnectException; +import java.net.ProtocolException; +import java.net.SocketTimeoutException; import java.util.concurrent.TimeUnit; /** @@ -19,9 +29,11 @@ import java.util.concurrent.TimeUnit; public class NettyClient { - public static void socketClient(String ip, Integer port, String msg, ChannelHandler handler) { + public static Channel socketClient(String ip, Integer port,String userId, ChannelHandler handler) { NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); + Channel channel = null; + try { bootstrap.group(group) .channel(NioSocketChannel.class) @@ -30,21 +42,56 @@ public class NettyClient { protected void initChannel(NioSocketChannel ch) { ch.pipeline() //空闲状态的handler + // 添加LineBasedFrameDecoder来按行分割数据 + .addLast(new LineBasedFrameDecoder(1024)) // .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(handler); + .addLast(handler) + .addLast(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 处理异常,例如记录日志、关闭连接等 + cause.printStackTrace(); + // 根据异常类型进行不同的处理 + if (cause instanceof ConnectException) { + // 处理连接异常,例如重试连接或记录特定的连接错误信息 + throw new BusinessException(CommonResponseEnum.FAIL,"服务端拒绝连接"); + } else if (cause instanceof IOException) { + // 处理I/O异常,例如读写错误 + System.out.println("IOException caught: There was an I/O error."); + // 例如,可以记录更详细的I/O错误信息 + } else if (cause instanceof TimeoutException) { + // 处理超时异常 + System.out.println("TimeoutException caught: Operation timed out."); + // 可以根据业务逻辑决定是否重试或记录超时信息 + } else if (cause instanceof ProtocolException) { + // 处理协议异常,例如消息格式不正确 + System.out.println("ProtocolException caught: Invalid protocol message."); + // 可以记录协议错误信息或向客户端发送错误响应 + } else { + // 处理其他类型的异常 + System.out.println("Unknown exception caught: " + cause.getMessage()); + // 可以记录未知异常信息 + } + ctx.close(); + } + }); + ; } }); ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); - channelFuture.channel().writeAndFlush(msg); - channelFuture.channel().closeFuture().sync(); + channel = channelFuture.channel(); + SocketManager.addUser(userId,channel); + return channel; } catch (Exception e) { + System.out.println("进入异常............"); e.printStackTrace(); - } finally { - group.shutdownGracefully(); - } + return null; + }finally { + System.out.println("进入clientSocket最后步骤---------------------"); + } } /** diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java new file mode 100644 index 00000000..191e66ee --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -0,0 +1,100 @@ +package com.njcn.gather.detection.util.socket.cilent; + +import com.njcn.gather.detection.handler.SocketResponseService; +import com.njcn.gather.detection.handler.SocketSourceResponseService; +import com.njcn.gather.detection.util.socket.SocketManager; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; + + +/** + * @Description: 源客户端业务处理(示例) + * @Author: wr + * @Date: 2024/12/10 14:16 + */ + +@RequiredArgsConstructor +public class NettyDevClientHandler extends SimpleChannelInboundHandler { + + private final String webUser; + + private final SocketResponseService socketResponseService; + + @Value("${socket.device.ip}") + private String devIp; + + @Value("${socket.device.port}") + private Integer devPort; + + + /** + * 当通道进行连接时推送消息 + * @param ctx + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + System.out.println("客户端通道已建立" + ctx.channel().remoteAddress()); + } + + /** + * 处理服务端消息消息信息 + */ + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { + System.out.println("接收server端数据>>>>>>"+msg); + socketResponseService.deal(webUser,msg); + + + } + + /** + * 当通道断线时,支持重连 + * @param ctx + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) { +// System.out.println("断线了......" + ctx.channel()); +// ctx.channel().eventLoop().schedule(() -> { +// System.out.println("断线重连......"); +// //重连 +// NettyClient.connect(); +// }, 3L, TimeUnit.SECONDS); + } + + /** + * 用户事件的回调方法(自定义事件用于心跳机制) + * + * @param ctx + * @param evt + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + //如果是空闲状态事件 +// if (evt instanceof IdleStateEvent) { +// if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) { +// System.out.println("空闲" + ctx.channel().id()); +// //发送ping 保持心跳链接 +// TextWebSocketFrame resp = new TextWebSocketFrame(ctx.channel().id() + " ping"); +// ctx.writeAndFlush(resp); +// } +// }else { +// userEventTriggered(ctx,evt); +// } + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + SocketManager.addUser(webUser,ctx.channel()); + System.out.println("有通道接入" + ctx.channel()); + } + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + +} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java index 33330877..2663918e 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java @@ -2,10 +2,13 @@ package com.njcn.gather.detection.util.socket.cilent; import com.njcn.gather.detection.handler.SocketSourceResponseService; import com.njcn.gather.detection.util.socket.SocketManager; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import java.util.Objects; /** @@ -21,6 +24,12 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler>>>>>"+msg); sourceResponseService.deal(webUser,msg); + + } /** diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java index a90a1280..d53473c0 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/web/WebSocketService.java @@ -57,9 +57,7 @@ public class WebSocketService { //设置连接保持为活动状态 .childOption(ChannelOption.SO_KEEPALIVE, true) - .childHandler(webSocketInitializer); - ChannelFuture future = serverBootstrap.bind(port).sync(); future.addListener(f -> { if (future.isSuccess()) { diff --git a/entrance/src/main/resources/application.yml b/entrance/src/main/resources/application.yml index bd7c7e31..29bd65c1 100644 --- a/entrance/src/main/resources/application.yml +++ b/entrance/src/main/resources/application.yml @@ -48,7 +48,7 @@ socket: ip: 192.168.1.138 port: 61000 device: - ip: 127.0.0.1 + ip: 192.168.1.127 port: 7777 webSocket: