From 0232eee131beca8a4de3df8d22832fcf2b105e32 Mon Sep 17 00:00:00 2001 From: chendaofei <857448963@qq.com> Date: Wed, 18 Dec 2024 10:32:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/SocketDevResponseService.java | 34 ++++++------------- .../handler/SocketSourceResponseService.java | 5 +-- .../gather/detection/pojo/vo/WebSocketVO.java | 17 ++++++++++ .../service/impl/PreDetectionServiceImpl.java | 25 +++++++------- .../detection/util/socket/SocketManager.java | 1 + .../util/socket/WebServiceManager.java | 13 +++++++ .../socket/cilent/NettyDevClientHandler.java | 6 ++-- 7 files changed, 60 insertions(+), 41 deletions(-) diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index a8719673..0b17c813 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -37,34 +37,20 @@ public class SocketDevResponseService { public void deal(String userId,String msg){ SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); - if(SourceOperateCodeEnum.YJC_YTXJY.getValue().equals(socketDataMsg.getRequestId())){ - SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); - if(ObjectUtil.isNotNull(dictDataEnumByCode)){ - SocketMsg socketMsg=new SocketMsg(); + SourceOperateCodeEnum sourceOperateCodeEnum = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getRequestId()); + switch (sourceOperateCodeEnum) { + case YJC_SBTXJY: - switch (dictDataEnumByCode){ - case SUCCESS: - - break; - case UNPROCESSED_BUSINESS: - break; - case RE_OPERATE: - socketMsg.setRequestId(socketDataMsg.getRequestId()); - socketMsg.setOperateCode("QUIT_FUNEND$01"); - SocketManager.sendMsg(userId,JSON.toJSONString(socketMsg)); - - break; - default: - socketMsg.setRequestId(socketDataMsg.getRequestId()); - socketMsg.setOperateCode(socketDataMsg.getOperateCode()); - socketMsg.setData(dictDataEnumByCode.getMessage()); - webSocketHandler.sendMsgToUser(userId, JSON.toJSONString(socketMsg)); - break; - } - } + break; + case YJC_XYJY: + break; + case YJC_XUJY: + break; } + + } diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java index e24b7870..1a90d6c5 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java @@ -15,6 +15,7 @@ import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler; import com.njcn.gather.detection.util.socket.web.WebSocketHandler; import com.njcn.gather.device.device.pojo.vo.PreDetection; import com.njcn.gather.device.device.service.IPqDevService; +import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -78,9 +79,9 @@ public class SocketSourceResponseService { switch (dictDataEnumByCode){ case SUCCESS: webSocketHandler.sendMsgToUser(param.getUserPageId(), msg); - String s = param.getUserPageId() + "_Dev"; + String s = param.getUserPageId() + "_dev"; //开始设备通讯检测 - NettyClient.socketClient(ip, port,param.getPlanId(), new NettyDevClientHandler(param.getPlanId(), socketDevResponseService)); + NettyClient.socketClient(ip, port, param.getUserPageId(), new NettyDevClientHandler(s, socketDevResponseService)); List devList = iPqDevService.getDevInfo(param.getDevIds()); Map> map = new HashMap(1); map.put("deviceList", devList); diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java index 3cdf1206..0eded4df 100644 --- a/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/vo/WebSocketVO.java @@ -1,5 +1,6 @@ package com.njcn.gather.detection.pojo.vo; +import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; import lombok.Data; import org.apache.poi.ss.formula.functions.T; @@ -11,4 +12,20 @@ public class WebSocketVO { private String message; private T data; + + + public WebSocketVO(SourceResponseCodeEnum sourceResponseCodeEnum){ + this.code = sourceResponseCodeEnum.getCode(); + this.message= sourceResponseCodeEnum.getMessage(); + } + + public WebSocketVO(Integer code, String message, T data) { + this.code = code; + this.message = message; + this.data = data; + } + + public WebSocketVO() { + + } } diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 28fddf1d..e5baf4ae 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -25,6 +25,7 @@ import com.njcn.gather.plan.service.IAdPlanService; import com.njcn.gather.plan.service.IAdPlanSourceService; import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum; import com.njcn.gather.system.dictionary.service.IDictDataService; +import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -123,18 +124,18 @@ public class PreDetectionServiceImpl implements PreDetectionService { @Override public boolean startTest(PreDetectionParam param) { -// Runnable runnable = new Runnable() { -// @Override -// public void run() { -// String ddId = param.getUserPageId(); -// Channel channel = SocketManager.getChannelByUserId(ddId); -// if( channel== null || !channel.isActive()){ -// NettyClient.socketClient(ip, port,param.getUserPageId(),new NettySourceClientHandler(ddId, sourceResponseService)); -// } -// SocketManager.sendMsg(ddId,"start\n"); -// } -// }; -// runnable.run(); + Runnable runnable = new Runnable() { + @Override + public void run() { + String ddId = param.getUserPageId()+"_Source"; + Channel channel = SocketManager.getChannelByUserId(ddId); + if( channel== null || !channel.isActive()){ + NettyClient.socketClient(ip, port,param.getUserPageId(),new NettySourceClientHandler(param, sourceResponseService)); + } + SocketManager.sendMsg(ddId,"start\n"); + } + }; + runnable.run(); return true; } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java index 1818942a..afaa9706 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java @@ -30,6 +30,7 @@ public class SocketManager { public static void sendMsg(String userId,String msg) { Channel channel = userSessions.get(userId); channel.writeAndFlush(msg); + System.out.println(userId+"__"+channel.id()+"往"+channel.remoteAddress()+"发送数据:"+msg); } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java index 535e0b52..f8098ec9 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/WebServiceManager.java @@ -1,5 +1,7 @@ package com.njcn.gather.detection.util.socket; +import com.alibaba.fastjson.JSON; +import com.njcn.gather.detection.pojo.vo.WebSocketVO; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; @@ -57,5 +59,16 @@ public class WebServiceManager { } } + + public static void sendMessage(String userId, WebSocketVO webSocketVO) { + Channel channel = userSessions.get(userId); + if(Objects.nonNull(channel) && channel.isActive()){ + TextWebSocketFrame wd = new TextWebSocketFrame(JSON.toJSONString(webSocketVO)); + channel.writeAndFlush(wd); + }else { + log.error("{}-websocket推送消息失败;当前用户-{}-客户端已经断开连接", LocalDateTime.now(),userId); + } + + } } \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index ffa4e5ad..a10861c3 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -62,8 +62,8 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { */ @Override public void channelInactive(ChannelHandlerContext ctx) { - System.out.println("客户端连接成功"); - SocketManager.addUser(webUser,ctx.channel()); + System.out.println("客户端重新连接成功"); + //SocketManager.addUser(webUser,ctx.channel()); } /** @@ -97,7 +97,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) { - SocketManager.addUser(webUser,ctx.channel()); + SocketManager.addUser(webUser+"_dev",ctx.channel()); System.out.println("有通道接入" + ctx.channel()); }