From a81439b1d224a5f83bb7bdf168b02d5d7abb0f08 Mon Sep 17 00:00:00 2001 From: wr <1754607820@qq.com> Date: Fri, 20 Dec 2024 13:28:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BE=AE=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/SocketDevResponseService.java | 25 +++++++--- .../handler/SocketSourceResponseService.java | 50 +++++++++++-------- .../service/impl/PreDetectionServiceImpl.java | 8 +-- .../gather/detection/util/socket/MsgUtil.java | 21 ++++++++ .../detection/util/socket/SocketManager.java | 29 ++++++++--- .../util/socket/cilent/NettyClient.java | 22 ++++---- .../socket/cilent/NettyDevClientHandler.java | 7 --- .../cilent/NettySourceClientHandler.java | 25 +++++----- 8 files changed, 117 insertions(+), 70 deletions(-) diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index e843a8d4..51270a97 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -21,7 +21,6 @@ import com.njcn.gather.device.device.service.IPqDevService; import com.njcn.gather.device.script.pojo.param.PqScriptIssueParam; import com.njcn.gather.device.script.pojo.po.SourceIssue; import com.njcn.gather.device.script.service.IPqScriptDtlsService; -import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; @@ -36,6 +35,8 @@ public class SocketDevResponseService { private final String handlerStr = "_Dev"; + private final String handlerSourceStr = "_Source"; + private final WebSocketHandler webSocketHandler; private final IPqDevService iPqDevService; private final IPqScriptDtlsService scriptDtlsService; @@ -221,7 +222,7 @@ public class SocketDevResponseService { successXieyi3.add(mId); System.out.println(successXieyi3.size() + "=====" + monitorIdListComm.size()); if (successXieyi3.size() == monitorIdListComm.size()) { - System.out.println("开始相序校验++++++++++"); + System.out.println("开始相序校验,向源下参数++++++++++"); PqScriptIssueParam issueParam = new PqScriptIssueParam(); issueParam.setPlanId(param.getPlanId()); issueParam.setSourceId(param.getSourceId()); @@ -233,7 +234,7 @@ public class SocketDevResponseService { xuMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); xuMsg.setData(JSON.toJSONString(sourceIssues.get(0))); - SocketManager.sendMsg(param.getUserPageId() + "_Source", JSON.toJSONString(xuMsg)); + SocketManager.sendMsg(param.getUserPageId() + "_Source", MsgUtil.toJsonWithNewLinePlain(xuMsg)); } } } @@ -288,18 +289,21 @@ public class SocketDevResponseService { info.addAll(devIsSource(dev, sourceIssues.get(0))); } } - String s = param.getUserPageId() + "_Source"; + String userSource = param.getUserPageId() + handlerSourceStr; SocketMsg msg = new SocketMsg(); msg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); msg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue()); Map map = new HashMap<>(1); map.put("sourceId", sourceIssues.get(0).getSourceId()); msg.setData(JSON.toJSONString(map)); - SocketManager.sendMsg(s, JSON.toJSONString(msg)); + SocketManager.sendMsg(userSource, MsgUtil.toJsonWithNewLinePlain(msg)); + SocketManager.removeUser(userSource); + SocketManager.removeUser(param.getUserPageId() + handlerStr); + //向前端推送消息 socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); socketMsg.setOperateCode(socketDataMsg.getOperateCode()); - socketMsg.setData(JSON.toJSONString(info)); + socketMsg.setData(info); webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg)); } break; @@ -324,8 +328,13 @@ public class SocketDevResponseService { private List devIsSource(DevData dev, SourceIssue issue) { + List info = new ArrayList<>(); String[] split = dev.getId().split("_"); + String devName = null; + if (devNameMapComm.containsKey(split[0])) { + devName = devNameMapComm.get(split[0]); + } List channelList = issue.getChannelList(); List sqlData = dev.getSqlData(); @@ -333,10 +342,12 @@ public class SocketDevResponseService { List dataI = sqlData.stream().filter(x -> "电流有效值".equals(x.getDesc())).collect(Collectors.toList()); if (CollUtil.isNotEmpty(dataV)) { SourceCompareDev compareDev = getSourceCompareDev(split, dataV, "电压有效值", "U", channelList); + compareDev.setDevName(devName); info.add(compareDev); } if (CollUtil.isNotEmpty(dataI)) { SourceCompareDev compareDev = getSourceCompareDev(split, dataI, "电流有效值", "I", channelList); + compareDev.setDevName(devName); info.add(compareDev); } @@ -360,6 +371,7 @@ public class SocketDevResponseService { List channelList ) { SourceCompareDev compareDev = new SourceCompareDev(); + compareDev.setIp(split[0]); compareDev.setLineNum(split[1]); compareDev.setDesc(name); @@ -413,6 +425,7 @@ public class SocketDevResponseService { this.successComm = new ArrayList<>(); this.successXieyi = new ArrayList<>(); this.successXieyi3 = new ArrayList<>(); + //初始化相序集合 this.devInfo = new ArrayList<>(); this.success = new ArrayList<>(); diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java index fd56def4..11750267 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java @@ -15,7 +15,6 @@ import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler; import com.njcn.gather.detection.util.socket.web.WebSocketHandler; import com.njcn.gather.device.device.pojo.vo.PreDetection; import com.njcn.gather.device.device.service.IPqDevService; -import io.netty.channel.Channel; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -37,17 +36,20 @@ public class SocketSourceResponseService { private final SocketDevResponseService socketDevResponseService; private final IPqDevService iPqDevService; + private final String DEV = "_Dev"; + @Value("${socket.device.ip}") private String ip; @Value("${socket.device.port}") private Integer port; - public void deal(PreDetectionParam param, String msg){ + + public void deal(PreDetectionParam param, String msg) { SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg); SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getRequestId()); - if(ObjectUtil.isNotNull(enumByCode)){ - switch (enumByCode){ + if (ObjectUtil.isNotNull(enumByCode)) { + switch (enumByCode) { case YJC_YTXJY: detectionDev(param, socketDataMsg); break; @@ -56,7 +58,7 @@ public class SocketSourceResponseService { break; } - }else{ + } else { System.out.println("1"); } @@ -65,20 +67,24 @@ public class SocketSourceResponseService { /** * 装置检测(当源初始化成功后,直接向装置通道向装置服务器发送,装置检测) + * * @param param * @param socketDataMsg */ private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) { SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); - if(ObjectUtil.isNotNull(dictDataEnumByCode)){ - SocketMsg socketMsg=new SocketMsg(); - switch (dictDataEnumByCode){ + if (ObjectUtil.isNotNull(dictDataEnumByCode)) { + SocketMsg socketMsg; + switch (dictDataEnumByCode) { case SUCCESS: - //todo 前端推送收到的消息暂未处理好 - webSocketHandler.sendMsgToUser(param.getUserPageId(), "msg"); - String s = param.getUserPageId() + "_Dev"; - //开始设备通讯检测(发送设备初始化) + socketMsg = new SocketMsg(); + socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); + socketMsg.setOperateCode(socketDataMsg.getOperateCode()); + socketMsg.setData("源通讯校验成功"); + webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg)); + SocketManager.removeUser(param.getUserPageId()+"_Source"); + //开始设备通讯检测(发送设备初始化) List devList = iPqDevService.getDevInfo(param.getDevIds()); Map> map = new HashMap(1); map.put("deviceList", devList); @@ -87,12 +93,12 @@ public class SocketSourceResponseService { socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue()); socketMsg.setData(jsonString); String json = JSON.toJSONString(socketMsg); - NettyClient.socketClient(ip, port, param.getUserPageId(),json, new NettyDevClientHandler(param, socketDevResponseService)); - // SocketManager.sendMsg(s,json); + NettyClient.socketClient(ip, port, param.getUserPageId(), json, new NettyDevClientHandler(param, socketDevResponseService)); break; case UNPROCESSED_BUSINESS: break; default: + socketMsg = new SocketMsg(); socketMsg.setRequestId(socketDataMsg.getRequestId()); socketMsg.setOperateCode(socketDataMsg.getOperateCode()); socketMsg.setData(dictDataEnumByCode.getMessage()); @@ -104,14 +110,15 @@ public class SocketSourceResponseService { /** * 相序检测向装置发送(当装置初始成功后,会向源发送加量请求。收到加量请求成功后会向装置发送参数下发。) + * * @param param * @param socketDataMsg */ private void phaseSequenceDev(PreDetectionParam param, SocketDataMsg socketDataMsg) { SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); - if(ObjectUtil.isNotNull(dictDataEnumByCode)){ - SocketMsg socketMsg=new SocketMsg(); - switch (dictDataEnumByCode){ + if (ObjectUtil.isNotNull(dictDataEnumByCode)) { + SocketMsg socketMsg = new SocketMsg(); + switch (dictDataEnumByCode) { case SUCCESS: //向前端推送信息 socketMsg.setRequestId(socketDataMsg.getRequestId()); @@ -119,21 +126,20 @@ public class SocketSourceResponseService { socketMsg.setData(dictDataEnumByCode.getMessage()); webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg)); - - String s = param.getUserPageId() + "_Dev"; + String s = param.getUserPageId() + DEV; socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_DATA_REQUEST_02.getValue()); List pqDevList = iPqDevService.getDevInfo(param.getDevIds()); List moniterIdList = pqDevList.stream().flatMap(x -> x.getMonitorList().stream()) .map(PreDetection.MonitorListDTO::getLineId) .collect(Collectors.toList()); - DevPhaseSequenceParam phaseSequenceParam=new DevPhaseSequenceParam(); + DevPhaseSequenceParam phaseSequenceParam = new DevPhaseSequenceParam(); phaseSequenceParam.setMoniterIdList(moniterIdList); - phaseSequenceParam.setDataType(Arrays.asList("实时数据/电压有效值","实时数据/电流有效值")); + phaseSequenceParam.setDataType(Arrays.asList("实时数据/电压有效值", "实时数据/电流有效值")); phaseSequenceParam.setReadCount(1); phaseSequenceParam.setIgnoreCount(10); socketMsg.setData(JSON.toJSONString(phaseSequenceParam)); - SocketManager.sendMsg(s,JSON.toJSONString(socketMsg)); + SocketManager.sendMsg(s, JSON.toJSONString(socketMsg)); break; case UNPROCESSED_BUSINESS: break; diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index a33f17d4..06c68fc8 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -1,11 +1,9 @@ package com.njcn.gather.detection.service.impl; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.gather.detection.handler.SocketDevResponseService; import com.njcn.gather.detection.handler.SocketSourceResponseService; @@ -15,9 +13,9 @@ import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.service.PreDetectionService; +import com.njcn.gather.detection.util.socket.MsgUtil; import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.cilent.NettyClient; -import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler; import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler; import com.njcn.gather.device.device.service.IPqDevService; import com.njcn.gather.device.script.service.IPqScriptDtlsService; @@ -34,8 +32,6 @@ import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.util.HashMap; -import java.util.Map; @Service @@ -102,7 +98,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { msg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue()); msg.setData(JSON.toJSONString(sourceParam)); param.setSourceId(sourceParam.getSourceId()); - NettyClient.socketClient(ip, port, param.getUserPageId(), JSON.toJSONString(msg), new NettySourceClientHandler(param, sourceResponseService)); + NettyClient.socketClient(ip, port, param.getUserPageId(), MsgUtil.toJsonWithNewLinePlain(msg), new NettySourceClientHandler(param, sourceResponseService)); } else { throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT); } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java index d013a75e..8f6c15ff 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java @@ -1,6 +1,7 @@ package com.njcn.gather.detection.util.socket; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import com.njcn.gather.detection.pojo.vo.SocketDataMsg; /** @@ -15,4 +16,24 @@ public class MsgUtil { public static SocketDataMsg socketDataMsg(String textMsg){ return JSON.parseObject(textMsg,SocketDataMsg.class); } + + /** + * 将对象转换为 JSON 字符串,并在末尾添加换行符 + * + * @param obj 需要转换的对象 + * @return 包含换行符的 JSON 字符串 + */ + public static String toJsonWithNewLine(Object obj) { + return JSON.toJSONString(obj, SerializerFeature.PrettyFormat) + "\n"; + } + + /** + * 将对象转换为 JSON 字符串,并在末尾添加换行符(不带格式化) + * + * @param obj 需要转换的对象 + * @return 包含换行符的 JSON 字符串 + */ + public static String toJsonWithNewLinePlain(Object obj) { + return JSON.toJSONString(obj) + "\n"; + } } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java index 0c63a968..cd015ead 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java @@ -2,7 +2,7 @@ package com.njcn.gather.detection.util.socket; import cn.hutool.core.util.ObjectUtil; import io.netty.channel.Channel; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.channel.nio.NioEventLoopGroup; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -14,22 +14,39 @@ import java.util.concurrent.ConcurrentHashMap; */ public class SocketManager { - private static final Map userSessions = new ConcurrentHashMap<>(); + private static final Map socketSessions = new ConcurrentHashMap<>(); + private static final Map socketGroup = new ConcurrentHashMap<>(); public static void addUser(String userId, Channel channel) { - userSessions.put(userId, channel); + socketSessions.put(userId, channel); } + public static void addGroup(String userId, NioEventLoopGroup group) { + socketGroup.put(userId, group); + } public static void removeUser(String userId) { - userSessions.remove(userId); + Channel channel = socketSessions.get(userId); + if(ObjectUtil.isNotNull(channel)){ + channel.close(); + NioEventLoopGroup eventExecutors = socketGroup.get(userId); + if(ObjectUtil.isNotNull(channel)){ + eventExecutors.shutdownGracefully(); + System.out.println(userId+"__"+channel.id()+"关闭了客户端"); + } + } + socketSessions.remove(userId); } public static Channel getChannelByUserId(String userId) { - return userSessions.get(userId); + return socketSessions.get(userId); + } + + public static NioEventLoopGroup getGroupByUserId(String userId) { + return socketGroup.get(userId); } public static void sendMsg(String userId,String msg) { - Channel channel = userSessions.get(userId); + Channel channel = socketSessions.get(userId); if(ObjectUtil.isNotNull(channel)){ channel.writeAndFlush(msg); System.out.println(userId+"__"+channel.id()+"往"+channel.remoteAddress()+"发送数据:"+msg); diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index af204067..576c0b03 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -1,9 +1,8 @@ package com.njcn.gather.detection.util.socket.cilent; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; -import com.njcn.common.pojo.exception.BusinessException; import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; import com.njcn.gather.detection.pojo.vo.WebSocketVO; import com.njcn.gather.detection.util.socket.SocketManager; @@ -13,21 +12,12 @@ import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; -import io.netty.handler.timeout.TimeoutException; import io.netty.util.CharsetUtil; -import lombok.Data; import lombok.Getter; -import java.io.IOException; -import java.net.ConnectException; -import java.net.ProtocolException; -import java.net.SocketTimeoutException; import java.util.concurrent.TimeUnit; /** @@ -79,6 +69,16 @@ public class NettyClient { channelFuture.channel().writeAndFlush(msg); } }); + NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(userPageId + "_Dev"); + if(ObjectUtil.isNotNull(groupByUserId)){ + groupByUserId.shutdownGracefully(); + }else{ + if (handler instanceof NettySourceClientHandler) { + SocketManager.addGroup(userPageId+"_Source",group); + }else{ + SocketManager.addGroup(userPageId+"_Dev",group); + } + } } catch (Exception e) { System.out.println("连接socker服务端发送异常............" + e.getMessage()); group.shutdownGracefully(); diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index 8d6c996c..2349cc09 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -8,7 +8,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.TimeoutException; import lombok.RequiredArgsConstructor; -import org.springframework.beans.factory.annotation.Value; import java.io.IOException; import java.net.ConnectException; @@ -29,12 +28,6 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { private final SocketDevResponseService socketResponseService; - @Value("${socket.device.ip}") - private String devIp; - - @Value("${socket.device.port}") - private Integer devPort; - /** * 当通道进行连接时推送消息 diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java index 8821d2ff..dd6cae8e 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java @@ -1,10 +1,10 @@ package com.njcn.gather.detection.util.socket.cilent; -import com.alibaba.fastjson.JSON; import com.njcn.gather.detection.handler.SocketSourceResponseService; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketMsg; +import com.njcn.gather.detection.util.socket.MsgUtil; import com.njcn.gather.detection.util.socket.SocketManager; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -27,24 +27,24 @@ import java.util.Objects; */ @RequiredArgsConstructor -public class NettySourceClientHandler extends SimpleChannelInboundHandler { +public class NettySourceClientHandler extends SimpleChannelInboundHandler { private final PreDetectionParam webUser; private final SocketSourceResponseService sourceResponseService; /** * 当通道进行连接时推送消息 + * * @param ctx */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端通道已建立" + ctx.channel().id()); - Channel channel = SocketManager.getChannelByUserId(webUser.getUserPageId()+"_Source"); - if(Objects.nonNull(channel)){ + Channel channel = SocketManager.getChannelByUserId(webUser.getUserPageId() + "_Source"); + if (Objects.nonNull(channel)) { channel.close(); } - SocketManager.addUser(webUser.getUserPageId()+"_Source",ctx.channel()); - System.out.println("一存储"); + SocketManager.addUser(webUser.getUserPageId() + "_Source", ctx.channel()); } /** @@ -52,14 +52,15 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler>>>>>"+msg); - sourceResponseService.deal(webUser,msg); + System.out.println("接收server端数据>>>>>>" + msg); + sourceResponseService.deal(webUser, msg); } /** * 当通道断线时,支持重连 + * * @param ctx */ @Override @@ -84,14 +85,14 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler