diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketResponseService.java index 5d25e757..7eb51699 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketResponseService.java @@ -1,29 +1,79 @@ package com.njcn.gather.detection.handler; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; +import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; +import com.njcn.gather.detection.pojo.param.PreDetectionParam; +import com.njcn.gather.detection.pojo.vo.SocketDataMsg; +import com.njcn.gather.detection.pojo.vo.SocketMsg; +import com.njcn.gather.detection.util.socket.MsgUtil; import com.njcn.gather.detection.util.socket.SocketManager; +import com.njcn.gather.detection.util.socket.cilent.NettyClient; +import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler; +import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import com.njcn.gather.detection.util.socket.web.WebSocketHandler; +import com.njcn.gather.device.device.pojo.vo.PreDetection; +import com.njcn.gather.device.device.service.IPqDevService; +import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.*; + + @Service @RequiredArgsConstructor public class SocketResponseService { + private final WebSocketHandler webSocketHandler; + + public void deal(String userId,String msg){ + SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); + if(SourceOperateCodeEnum.YJC_YTXJY.getValue().equals(socketDataMsg.getRequestId())){ + SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); + if(ObjectUtil.isNotNull(dictDataEnumByCode)){ + SocketMsg socketMsg=new SocketMsg(); + switch (dictDataEnumByCode){ + case SUCCESS: - System.out.println("进入deal+++++++++++++++++++"); - webSocketHandler.sendMsgToUser(userId,msg); + break; + case UNPROCESSED_BUSINESS: + break; + case RE_OPERATE: + socketMsg.setRequestId(socketDataMsg.getRequestId()); + socketMsg.setOperateCode("QUIT_FUNEND$01"); + SocketManager.sendMsg(userId,JSON.toJSONString(socketMsg)); - SocketManager.getChannelByUserId(userId).close(); + break; + default: + socketMsg.setRequestId(socketDataMsg.getRequestId()); + socketMsg.setOperateCode(socketDataMsg.getOperateCode()); + socketMsg.setData(dictDataEnumByCode.getMessage()); + webSocketHandler.sendMsgToUser(userId, JSON.toJSONString(socketMsg)); + break; + } + } + + } } + + + + + + } diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceResponseCodeEnum.java b/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceResponseCodeEnum.java index 6728a453..9855431b 100644 --- a/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceResponseCodeEnum.java +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceResponseCodeEnum.java @@ -22,9 +22,18 @@ public enum SourceResponseCodeEnum { STOP_ERROR(10526, "停止源失败"), NOT_INITIALIZED(10527, "源未进行初始化"), TARGET_SOURCE_ERROR(10528, "目标源有误(该用户已控制其他源,在关闭前无法操作新的源)"), - UNABLE_TO_RESPOND(10529, "源状态有误,无法响应报文(例如源处于输出状态,无法响应初始化报文)"); + UNABLE_TO_RESPOND(10529, "源状态有误,无法响应报文(例如源处于输出状态,无法响应初始化报文)"), + RE_OPERATE(10552,"重复的初始化操作") + + + + + + + ; + private Integer code; private String message; diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 6dd62b10..d4322eb0 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -12,6 +12,7 @@ import com.njcn.gather.detection.service.PreDetectionService; import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.cilent.NettyClient; +import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler; import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import com.njcn.gather.device.device.pojo.vo.PreDetection; import com.njcn.gather.device.device.service.IPqDevService; @@ -26,6 +27,11 @@ import com.njcn.gather.plan.service.IAdPlanService; import com.njcn.gather.plan.service.IAdPlanSourceService; import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum; import com.njcn.gather.system.dictionary.service.IDictDataService; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -36,6 +42,8 @@ import java.util.*; @RequiredArgsConstructor public class PreDetectionServiceImpl implements PreDetectionService { + private final String source= "source"; + private final IPqDevService iPqDevService; private final IDictDataService dictDataService; private final IAdPlanService iAdPlanService; @@ -43,6 +51,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { private final IPqSourceService pqSourceService; private final IPqScriptDtlsService pqScriptDtlsService; + @Value("${socket.source.ip}") private String ip; @@ -96,7 +105,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { msg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); msg.setData(JSON.toJSONString(sourceParam)); String s = userPageId + "_Source"; - NettyClient.socketClient(ip, port, s, new NettySourceClientHandler(s, sourceResponseService)); + NettyClient.socketClient(ip, port, new NettySourceClientHandler(s, sourceResponseService)); SocketManager.sendMsg(s,JSON.toJSONString(msg)); @@ -120,48 +129,19 @@ public class PreDetectionServiceImpl implements PreDetectionService { @Override public boolean startTest(PreDetectionParam param) { - List pqDevList = iPqDevService.getDevInfo(Arrays.asList("578c142b7e4e4978a35bd6225aa62a23", "393504f55f1f79bce255bfc195cfdb56")); - System.out.println(pqDevList); - //校验 -// List pqDevList = iPqDevService.getDevInfo(Arrays.asList("578c142b7e4e4978a35bd6225aa62a23", "393504f55f1f79bce255bfc195cfdb56")); -// System.out.println(pqDevList); -// //校验 -// -// //组装请求数据 -// SocketMsg socketMsg = new SocketMsg(); -// Map> map = new HashMap(); -// map.put("deviceList", pqDevList); -// String jsonString = JSON.toJSONString(map); -// socketMsg.setRequestId("adawdawd"); -// socketMsg.setOperateCode("INIT_GATHER$03"); -// socketMsg.setData(jsonString); -// String json = JSON.toJSONString(socketMsg); -// -// NettyClient.socketClient(ip, port, "{\"data\":\"{\\\"deviceList\\\":[{\\\"devIP\\\":\\\"192.168.1.186\\\",\\\"port\\\":102,\\\"devType\\\":\\\"PQS882B\\\",\\\"icdType\\\":\\\"PQS882_VX_ZJ_1(V102)\\\",\\\"devCode\\\":\\\"Pqs\\u0026cn870299\\\",\\\"devKey\\\":\\\"!qaz@wsx3edc4rfv\\\",\\\"monitorList\\\":[{\\\"lineId\\\":\\\"1_192.168.1.186_102_1\\\",\\\"line\\\":1}]}]}\",\"operateCode\":\"INIT_GATHER$03\",\"requestId\":\"dansldquiwdlandalksn\"}", new NettySourceClientHandler(param.getUserPageId(), sourceResponseService)); - Runnable runnable = new Runnable() { - @Override - public void run() { -// NettyClient.socketClient(ip, port, "源客户端初始化发送", new NettySourceClientHandler(ip + "_" + port, sourceResponseService)); + + Runnable runnable = new Runnable() { + @Override + public void run() { + String ddId = param.getUserPageId(); + Channel channel = SocketManager.getChannelByUserId(ddId); + if( channel== null || !channel.isActive()){ + NettyClient.socketClient(ip, port,new NettySourceClientHandler(ddId, sourceResponseService)); } - }; - runnable.run(); - System.out.println("111111111111111111111+++++++++++++++"); -// Runnable runnable2 = new Runnable() { -// @Override -// public void run() { -// NettyClient.socketClient(ip, 61001, "装置客户端初始化发送", new NettySourceClientHandler(ip + "_" + 61001, sourceResponseService2)); -// } -// }; -// runnable2.run(); - - - String tem = "{\"data\":\"{\\\"deviceList\\\":[{\\\"devIP\\\":\\\"192.168.1.186\\\",\\\"port\\\":102,\\\"devType\\\":\\\"PQS882B\\\",\\\"icdType\\\":\\\"PQS882_VX_ZJ_1(V102)\\\",\\\"devCode\\\":\\\"Pqs\\u0026cn870299\\\",\\\"devKey\\\":\\\"!qaz@wsx3edc4rfv\\\",\\\"monitorList\\\":[{\\\"lineId\\\":\\\"1_192.168.1.186_102_1\\\",\\\"line\\\":1}]}]}\",\"operateCode\":\"INIT_GATHER$03\",\"requestId\":\"dansldquiwdlandalksn\"}"; - - - return false; + SocketManager.sendMsg(ddId,"start\n"); + } + }; + runnable.run(); + return true; } - -// public static void main(String[] args) { -// NettyClient.socketClient("192.168.1.121", 61000, "源客户端初始化发送", new NettySourceClientHandler( "192.168.1.121_61000")); -// } -} +} \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java index a99bb897..535e0b52 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java @@ -2,9 +2,12 @@ package com.njcn.gather.detection.util.socket; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import lombok.extern.slf4j.Slf4j; +import java.time.LocalDateTime; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** @@ -12,6 +15,7 @@ import java.util.concurrent.ConcurrentHashMap; * @Author: wr * @Date: 2024/12/11 13:04 */ +@Slf4j public class WebServiceManager { //key:页面 value:channel @@ -45,9 +49,13 @@ public class WebServiceManager { public static void sendMsg(String userId,String msg) { Channel channel = userSessions.get(userId); + if(Objects.nonNull(channel) && channel.isActive()){ + TextWebSocketFrame wd = new TextWebSocketFrame(msg); + channel.writeAndFlush(wd); + }else { + log.error("{}-websocket推送消息失败;当前用户-{}-客户端已经断开连接", LocalDateTime.now(),userId); + } - TextWebSocketFrame wd = new TextWebSocketFrame(msg); - channel.writeAndFlush(wd); } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index f4957930..8bce53d8 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -9,14 +9,21 @@ import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.TimeoutException; import io.netty.util.CharsetUtil; +import lombok.Data; +import lombok.Getter; import java.io.IOException; import java.net.ConnectException; import java.net.ProtocolException; +import java.net.SocketTimeoutException; import java.util.concurrent.TimeUnit; /** @@ -24,10 +31,12 @@ import java.util.concurrent.TimeUnit; * @Author: wr * @Date: 2024/12/10 14:16 */ + +@Getter public class NettyClient { - public static void socketClient(String ip, Integer port,String userId, ChannelHandler handler) { + public static void socketClient(String ip, Integer port,SimpleChannelInboundHandler handler) { NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { @@ -40,47 +49,26 @@ public class NettyClient { //空闲状态的handler // 添加LineBasedFrameDecoder来按行分割数据 .addLast(new LineBasedFrameDecoder(1024)) -// .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) + // .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(handler) - .addLast(new ChannelInboundHandlerAdapter() { - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // 处理异常,例如记录日志、关闭连接等 - cause.printStackTrace(); - // 根据异常类型进行不同的处理 - if (cause instanceof ConnectException) { - // 处理连接异常,例如重试连接或记录特定的连接错误信息 - throw new BusinessException(CommonResponseEnum.FAIL,"服务端拒绝连接"); - } else if (cause instanceof IOException) { - // 处理I/O异常,例如读写错误 - System.out.println("IOException caught: There was an I/O error."); - // 例如,可以记录更详细的I/O错误信息 - } else if (cause instanceof TimeoutException) { - // 处理超时异常 - System.out.println("TimeoutException caught: Operation timed out."); - // 可以根据业务逻辑决定是否重试或记录超时信息 - } else if (cause instanceof ProtocolException) { - // 处理协议异常,例如消息格式不正确 - System.out.println("ProtocolException caught: Invalid protocol message."); - // 可以记录协议错误信息或向客户端发送错误响应 - } else { - // 处理其他类型的异常 - System.out.println("Unknown exception caught: " + cause.getMessage()); - // 可以记录未知异常信息 - } - ctx.close(); - } - }); ; } }); - ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); - SocketManager.addUser(userId,channelFuture.channel()); + bootstrap.connect(ip, port).sync().addListener((ChannelFutureListener) ch -> { + if (!ch.isSuccess()) { + System.out.println("链接服务端失败..."); + } else { + System.out.println("链接服务端成功..."); + } + }); } catch (Exception e) { System.out.println("进入异常............"); e.printStackTrace(); + group.shutdownGracefully(); + //TODO 通知页面 + }finally { System.out.println("进入clientSocket最后步骤---------------------"); } @@ -119,4 +107,6 @@ public class NettyClient { connect(bootstrap, msg); } } + + } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index 191e66ee..3a961a97 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -3,11 +3,19 @@ package com.njcn.gather.detection.util.socket.cilent; import com.njcn.gather.detection.handler.SocketResponseService; import com.njcn.gather.detection.handler.SocketSourceResponseService; import com.njcn.gather.detection.util.socket.SocketManager; +import com.njcn.gather.detection.util.socket.WebServiceManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.TimeoutException; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; +import java.io.IOException; +import java.net.ConnectException; +import java.net.ProtocolException; + /** * @Description: 源客户端业务处理(示例) @@ -55,12 +63,8 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { */ @Override public void channelInactive(ChannelHandlerContext ctx) { -// System.out.println("断线了......" + ctx.channel()); -// ctx.channel().eventLoop().schedule(() -> { -// System.out.println("断线重连......"); -// //重连 -// NettyClient.connect(); -// }, 3L, TimeUnit.SECONDS); + System.out.println("客户端连接成功"); + SocketManager.addUser(webUser,ctx.channel()); } /** @@ -71,17 +75,25 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { - //如果是空闲状态事件 -// if (evt instanceof IdleStateEvent) { -// if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) { -// System.out.println("空闲" + ctx.channel().id()); -// //发送ping 保持心跳链接 -// TextWebSocketFrame resp = new TextWebSocketFrame(ctx.channel().id() + " ping"); -// ctx.writeAndFlush(resp); -// } -// }else { -// userEventTriggered(ctx,evt); -// } + //当连接超过10S和发送消息后10S无响应时候,关闭channel + if (evt instanceof IdleStateEvent) { + IdleStateEvent idleEvent = (IdleStateEvent) evt; + if (idleEvent.state() == IdleState.READER_IDLE) { + System.out.println("读空闲超时!"); + // 处理读空闲超时逻辑,例如关闭连接或发送心跳包 + ctx.close(); + + WebServiceManager.sendMsg(webUser,"读取socket服务端等待时长超过10s,自动中断连接"); + SocketManager.removeUser(webUser); + } + // 可以添加对WRITER_IDLE和ALL_IDLE的处理逻辑 + } else { + try { + super.userEventTriggered(ctx, evt); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } @Override @@ -93,7 +105,30 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 处理异常,例如记录日志、关闭连接等 cause.printStackTrace(); + // 根据异常类型进行不同的处理 + if (cause instanceof ConnectException) { + // 处理连接异常,例如重试连接或记录特定的连接错误信息 + System.out.println("连接socket服务端异常"); + + } else if (cause instanceof IOException) { + // 处理I/O异常,例如读写错误 + System.out.println("IOException caught: There was an I/O error."); + // 例如,可以记录更详细的I/O错误信息 + } else if (cause instanceof TimeoutException) { + // 处理超时异常 + System.out.println("TimeoutException caught: Operation timed out."); + // 可以根据业务逻辑决定是否重试或记录超时信息 + } else if (cause instanceof ProtocolException) { + // 处理协议异常,例如消息格式不正确 + System.out.println("ProtocolException caught: Invalid protocol message."); + // 可以记录协议错误信息或向客户端发送错误响应 + } else { + // 处理其他类型的异常 + System.out.println("Unknown exception caught: " + cause.getMessage()); + // 可以记录未知异常信息 + } ctx.close(); } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServer.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServer.java index b89622df..824025a7 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServer.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServer.java @@ -21,7 +21,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; */ public class NettyServer { - public static final int port = 61000; + public static final int port = 8574; public static void main(String[] args) { NettyServer nettyServer = new NettyServer(); diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerChannelInitializer.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerChannelInitializer.java index 829e5d04..8a07eece 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerChannelInitializer.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/service/NettyServerChannelInitializer.java @@ -12,13 +12,13 @@ import io.netty.util.CharsetUtil; * @Author: wr * @Date: 2024/12/10 14:18 */ -@ChannelHandler.Sharable public class NettyServerChannelInitializer extends ChannelInitializer { public static final NettyServerChannelInitializer INSTANCE = new NettyServerChannelInitializer(); @Override protected void initChannel(NioSocketChannel ch) { + System.out.println("初始化一次888888888888888888888888"); ch.pipeline() //空闲状态的处理器 // .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)) diff --git a/entrance/src/main/resources/application.yml b/entrance/src/main/resources/application.yml index 16d15e73..1869212d 100644 --- a/entrance/src/main/resources/application.yml +++ b/entrance/src/main/resources/application.yml @@ -45,11 +45,11 @@ mybatis-plus: socket: source: - ip: 192.168.1.121 - port: 61000 - device: ip: 192.168.1.127 port: 8574 + device: + ip: 192.168.1.138 + port: 61000 webSocket: port: 7777