diff --git a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java index 7bdcc3d1..dc001dc6 100644 --- a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java +++ b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java @@ -27,30 +27,44 @@ public class PreDetectionController extends BaseController { /** - * 开始检测 + * 终止预检测 */ - @PostMapping("/startTest") + @PostMapping("/closePreTest") @OperateInfo - @ApiOperation("开始检测") + @ApiOperation("终止预检测") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult startTest(@RequestBody PreDetectionParam param){ - String methodDescribe = getMethodDescribe("startTest"); - preDetectionService.startTest(param); - + public HttpResult closePreTest(@RequestBody PreDetectionParam param){ + String methodDescribe = getMethodDescribe("closePreTest"); + preDetectionService.closePreTest(param); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } /** - * 开始检测 + * 开始预检测 */ - @PostMapping("/startTest2") + @PostMapping("/startPreTest") @OperateInfo - @ApiOperation("开始检测") + @ApiOperation("开始预检测") @ApiImplicitParam(name = "param", value = "查询参数", required = true) - public HttpResult startTest2(@RequestBody PreDetectionParam param){ - String methodDescribe = getMethodDescribe("startTest"); + public HttpResult startPreTest(@RequestBody PreDetectionParam param){ + String methodDescribe = getMethodDescribe("startPreTest"); preDetectionService.sourceCommunicationCheck(param); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + + + + /** + * 测试 + */ + @PostMapping("/startTest") + @OperateInfo + @ApiOperation("测试") + @ApiImplicitParam(name = "param", value = "查询参数", required = true) + public HttpResult startTest(@RequestBody PreDetectionParam param){ + String methodDescribe = getMethodDescribe("startTest"); + preDetectionService.startTest(param); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } } diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index 7c606026..8aa4086d 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -86,44 +86,15 @@ public class SocketDevResponseService { case YJC_SBTXJY: devComm(socketDataMsg, param, msg); break; - case QUITE: - SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); - SourceOperateCodeEnum operateCodeEnum = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getOperateCode()); - SocketMsg socketMsg = new SocketMsg(); - switch (dictDataEnumByCode) { - case SUCCESS: - //通讯校验成功 - switch (operateCodeEnum) { - case QUIT_INIT_01: - break; - case QUIT_INIT_02: - socketMsg.setRequestId("quit"); - socketMsg.setOperateCode("QUIT_FUNEND$01"); - SocketManager.sendMsg(userKey, JSON.toJSONString(socketMsg)); - break; - case QUIT_INIT_03: - socketMsg.setRequestId("quit"); - socketMsg.setOperateCode("QUIT_FUNEND$02"); - SocketManager.sendMsg(userKey, JSON.toJSONString(socketMsg)); - break; - } - break; - case UNPROCESSED_BUSINESS: - break; - case NO_INIT_DEV: - break; - - default: - WebSocketVO webSocketVO = new WebSocketVO(); - break; - } - break; case YJC_XYJY: devXieyi(socketDataMsg, param, msg); break; case YJC_XUJY: devXu(param, socketDataMsg); break; + case QUITE: + quitDeal(socketDataMsg, param, msg); + break; } @@ -185,6 +156,11 @@ public class SocketDevResponseService { break; case RE_OPERATE: + //出现已经初始化情况,发送用户用户确认是否继续检测 + webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); + quitSend(param, socketMsg); + break; + case NO_INIT_DEV: //发起关闭操作 quitSend(param, socketMsg); break; @@ -209,7 +185,23 @@ public class SocketDevResponseService { webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg,devNameMapComm,2)); if (socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.DEV_INIT_GATHER_02.getValue())) { successXieyi.add(socketDataMsg.getData()); + + + if (successXieyi.size() == monitorIdListComm.size()) { + + for(String string :successXieyi){ + + //模拟统计协议校验,通讯校验已经校验过,模拟直接推送 + + SocketDataMsg webSend = new SocketDataMsg(); + webSend.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue()); + webSend.setCode(SourceResponseCodeEnum.SUCCESS.getCode()); + webSend.setRequestId(SourceOperateCodeEnum.YJC_XYJY.getValue()); + webSend.setData(string); + webSocketHandler.sendMsgToUser(param.getUserPageId(),MsgUtil.msgToWebData(webSend,devNameMapComm,1)); + } + //协议3校验 successXieyi = new ArrayList<>(); @@ -230,6 +222,12 @@ public class SocketDevResponseService { successXieyi3.add(socketDataMsg.getData()); System.out.println(successXieyi3.size() + "=====" + monitorIdListComm.size()); if (successXieyi3.size() == monitorIdListComm.size()) { + + + + + + SocketDataMsg temMsg = new SocketDataMsg(); temMsg.setCode(SourceResponseCodeEnum.DEV_COMM_ALL_SUCCESS.getCode()); temMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_03.getValue()); @@ -250,7 +248,7 @@ public class SocketDevResponseService { xuMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); xuMsg.setData(JSON.toJSONString(sourceIssues.get(0))); - SocketManager.sendMsg(param.getUserPageId() + "_Source", MsgUtil.toJsonWithNewLinePlain(xuMsg)); + SocketManager.sendMsg(param.getUserPageId() +handlerSourceStr, MsgUtil.toJsonWithNewLinePlain(xuMsg)); } } } @@ -258,11 +256,12 @@ public class SocketDevResponseService { break; case UNPROCESSED_BUSINESS: - webSocketHandler.sendMsgToUser(param.getUserPageId(),msg); + webSocketHandler.sendMsgToUser(param.getUserPageId(),JSON.toJSONString(socketDataMsg)); break; case RE_OPERATE: - //发起关闭操作 + //出现已经初始化情况,发送用户用户确认是否继续检测 + webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); quitSend(param, socketMsg); break; default: @@ -273,15 +272,6 @@ public class SocketDevResponseService { } } - /** - * 退出检测 - */ - private void quitSend(PreDetectionParam param, SocketMsg socketMsg) { - socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); - socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); - SocketManager.sendMsg(param.getUserPageId() + handlerStr, JSON.toJSONString(socketMsg)); - - } public void devXu(PreDetectionParam param, SocketDataMsg socketDataMsg) { @@ -292,6 +282,7 @@ public class SocketDevResponseService { SocketMsg socketMsg = new SocketMsg(); switch (dictDataEnumByCode) { case SUCCESS: + //webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(MsgUtil.msgToWebData(socketDataMsg,devNameMapComm,1))); devInfo.add(devData); success.add(devData.getId()); if (success.size() == monitorIdListComm.size()) { @@ -307,6 +298,19 @@ public class SocketDevResponseService { info.addAll(devIsSource(dev, sourceIssues.get(0))); } } + if(CollUtil.isNotEmpty(info)){ + SocketDataMsg dataMsg = new SocketDataMsg(); + dataMsg.setOperateCode(SourceOperateCodeEnum.DEV_DATA_REQUEST_02.getValue()); + dataMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); + dataMsg.setCode(SourceResponseCodeEnum.SUCCESS.getCode()); + + + for(SourceCompareDev sourceCompareDev : info){ + dataMsg.setData(sourceCompareDev.getDevName()+"_"+sourceCompareDev.getLineNum()+sourceCompareDev.getDesc()+"校验结果:"+(sourceCompareDev.getIsQualified()?"合格":"不合格")); + webSocketHandler.sendMsgToUser(param.getUserPageId(),JSON.toJSONString(dataMsg) ); + } + } + String userSource = param.getUserPageId() + handlerSourceStr; SocketMsg msg = new SocketMsg(); msg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); @@ -319,10 +323,13 @@ public class SocketDevResponseService { SocketManager.removeUser(param.getUserPageId() + handlerStr); //向前端推送消息 - socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); - socketMsg.setOperateCode(socketDataMsg.getOperateCode()); - socketMsg.setData(info); - webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg)); + // webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); + + SocketDataMsg temMsg = new SocketDataMsg(); + temMsg.setCode(SourceResponseCodeEnum.DEV_COMM_ALL_SUCCESS.getCode()); + temMsg.setOperateCode(SourceOperateCodeEnum.DEV_DATA_REQUEST_02.getValue()); + temMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); + webSocketHandler.sendMsgToUser(param.getUserPageId(),JSON.toJSONString(temMsg) ); } break; case UNPROCESSED_BUSINESS: @@ -345,6 +352,74 @@ public class SocketDevResponseService { } + + /** + * 退出检测 + */ + private void quitDeal(SocketDataMsg socketDataMsg,PreDetectionParam param, String msg) { + SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); + SourceOperateCodeEnum operateCodeEnum = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getOperateCode()); + SocketMsg socketMsg = new SocketMsg(); + String s = param.getUserPageId() + handlerStr; + + switch (dictDataEnumByCode) { + case SUCCESS: + //通讯校验成功 + switch (operateCodeEnum) { + case QUIT_INIT_01: + //关闭所有 + SocketManager.removeUser(s); + break; + case QUIT_INIT_02: + socketMsg.setRequestId("quit"); + socketMsg.setOperateCode("QUIT_FUNEND$01"); + SocketManager.sendMsg(s, JSON.toJSONString(socketMsg)); + break; + case QUIT_INIT_03: + socketMsg.setRequestId("quit"); + socketMsg.setOperateCode("QUIT_FUNEND$02"); + SocketManager.sendMsg(s, JSON.toJSONString(socketMsg)); + break; + } + break; + case UNPROCESSED_BUSINESS: + break; + case NO_INIT_DEV: + switch (operateCodeEnum) { + case QUIT_INIT_01: + break; + case QUIT_INIT_02: + socketMsg.setRequestId("quit"); + socketMsg.setOperateCode("QUIT_FUNEND$01"); + SocketManager.sendMsg(s, JSON.toJSONString(socketMsg)); + break; + case QUIT_INIT_03: + socketMsg.setRequestId("quit"); + socketMsg.setOperateCode("QUIT_FUNEND$02"); + SocketManager.sendMsg(s, JSON.toJSONString(socketMsg)); + break; + } + break; + + default: + WebSocketVO webSocketVO = new WebSocketVO(); + break; + } + + } + + /** + * 退出检测 + */ + private void quitSend(PreDetectionParam param, SocketMsg socketMsg) { + socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); + socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); + SocketManager.sendMsg(param.getUserPageId() + handlerStr, JSON.toJSONString(socketMsg)); + + } + + + private List devIsSource(DevData dev, SourceIssue issue) { List info = new ArrayList<>(); diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java index d3b3bfec..2c041306 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketSourceResponseService.java @@ -37,6 +37,7 @@ public class SocketSourceResponseService { private final IPqDevService iPqDevService; private final String DEV = "_Dev"; + private final String source = "_Source"; @Value("${socket.device.ip}") @@ -56,6 +57,10 @@ public class SocketSourceResponseService { case YJC_XUJY: phaseSequenceDev(param, socketDataMsg); break; + case QUITE_SOURCE: + // System.out.println("关闭源回调:"+msg); + SocketManager.removeUser(param.getUserPageId()+source); + break; } } else { @@ -74,15 +79,10 @@ public class SocketSourceResponseService { private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) { SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode()); if (ObjectUtil.isNotNull(dictDataEnumByCode)) { - SocketMsg socketMsg; + SocketMsg socketMsg = new SocketMsg(); + switch (dictDataEnumByCode) { case SUCCESS: - socketMsg = new SocketMsg(); - socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); - socketMsg.setOperateCode(socketDataMsg.getOperateCode()); - socketMsg.setData("源通讯校验成功"); - webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg)); - SocketManager.removeUser(param.getUserPageId()+"_Source"); //todo 前端推送收到的消息暂未处理好 webSocketHandler.sendMsgToUser(param.getUserPageId(),JSON.toJSONString(socketDataMsg) ); @@ -96,7 +96,6 @@ public class SocketSourceResponseService { socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_INIT_GATHER_01.getValue()); socketMsg.setData(jsonString); String json = JSON.toJSONString(socketMsg); - NettyClient.socketClient(ip, port, s,json, new NettyDevClientHandler(param, socketDevResponseService)); // SocketManager.sendMsg(s,json); NettyClient.socketClient(ip, port, param.getUserPageId(), json, new NettyDevClientHandler(param, socketDevResponseService)); break; @@ -127,10 +126,7 @@ public class SocketSourceResponseService { switch (dictDataEnumByCode) { case SUCCESS: //向前端推送信息 - socketMsg.setRequestId(socketDataMsg.getRequestId()); - socketMsg.setOperateCode(socketDataMsg.getOperateCode()); - socketMsg.setData(dictDataEnumByCode.getMessage()); - webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg)); + webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); String s = param.getUserPageId() + DEV; socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue()); @@ -148,6 +144,7 @@ public class SocketSourceResponseService { SocketManager.sendMsg(s, JSON.toJSONString(socketMsg)); break; case UNPROCESSED_BUSINESS: + webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg)); break; default: socketMsg.setRequestId(socketDataMsg.getRequestId()); diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceOperateCodeEnum.java b/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceOperateCodeEnum.java index bdfc3715..4c0dcb9d 100644 --- a/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceOperateCodeEnum.java +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/enums/SourceOperateCodeEnum.java @@ -43,7 +43,8 @@ public enum SourceOperateCodeEnum { YJC_XUJY("YJC_xujy", "预检测_相序校验"), - QUITE("quit","预监测_关闭设备通讯初始化") + QUITE("quit","预监测_关闭设备通讯初始化"), + QUITE_SOURCE("close_source","预监测_关闭源通讯") ; diff --git a/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java b/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java index 81e011d0..1798775f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/PreDetectionService.java @@ -23,5 +23,9 @@ public interface PreDetectionService { boolean startTest(PreDetectionParam param); + boolean closePreTest(PreDetectionParam param); + + + } diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 842b590e..b5c1300f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -10,6 +10,7 @@ import com.njcn.gather.detection.handler.SocketSourceResponseService; import com.njcn.gather.detection.pojo.enums.DetectionResponseEnum; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; +import com.njcn.gather.detection.pojo.vo.SocketDataMsg; import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.service.PreDetectionService; @@ -28,17 +29,20 @@ import com.njcn.gather.plan.service.IAdPlanSourceService; import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum; import com.njcn.gather.system.dictionary.service.IDictDataService; import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import java.util.Objects; @Service @RequiredArgsConstructor public class PreDetectionServiceImpl implements PreDetectionService { - private final String source = "source"; + private final String source = "_Source"; + private final String dev = "_Dev"; private final IPqDevService iPqDevService; private final IDictDataService dictDataService; @@ -50,10 +54,10 @@ public class PreDetectionServiceImpl implements PreDetectionService { private final SocketDevResponseService socketDevResponseService; - @Value("${socket.source.ip}") + @Value("${socket.source.ip:192.168.1.136}") private String ip; - @Value("${socket.source.port}") + @Value("${socket.source.port:10086}") private Integer port; private final SocketSourceResponseService sourceResponseService; @@ -61,6 +65,40 @@ public class PreDetectionServiceImpl implements PreDetectionService { @Override public void sourceCommunicationCheck(PreDetectionParam param) { + + + Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source); + if (Objects.nonNull(channel) && channel.isActive()) { + System.out.println("进入关闭源。。//////"); + SocketDataMsg socketDataMsg = new SocketDataMsg(); + socketDataMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue()); + socketDataMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue()); + + try { + channel.writeAndFlush(JSON.toJSONString(socketDataMsg)).sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + SocketManager.removeUser(param.getUserPageId() + source); + SocketManager.removeUser(param.getUserPageId() + dev); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + + + + /* 先组装源通讯协议 查询计划什么模式的(除了对比式,其他都是一个计划对应一个源) @@ -112,7 +150,23 @@ public class PreDetectionServiceImpl implements PreDetectionService { public boolean startTest(PreDetectionParam param) { socketDevResponseService.initList(param); - NettyClient.socketClient(ip, port, param.getUserPageId(),"start\n", new NettySourceClientHandler(param, sourceResponseService)); + NettyClient.socketClient(ip, port, param.getUserPageId(), "start\n", new NettySourceClientHandler(param, sourceResponseService)); + + return true; + } + + @Override + public boolean closePreTest(PreDetectionParam param) { + + SocketMsg socketMsg = new SocketMsg(); + socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue()); + socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue()); + SocketManager.sendMsg(param.getUserPageId() + dev, JSON.toJSONString(socketMsg)); + + + socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue()); + socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue()); + SocketManager.sendMsg(param.getUserPageId() + source, JSON.toJSONString(socketMsg)); return true; } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java index 1d0b024c..5efdfdaf 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/MsgUtil.java @@ -41,6 +41,13 @@ public class MsgUtil { } + /** + * + * @param socketDataMsg + * @param devMap + * @param type 0.装置 1.监测点 + * @return + */ public static String msgToWebData(SocketDataMsg socketDataMsg, Map devMap,Integer type){ String data = socketDataMsg.getData(); if (StrUtil.isNotBlank(data)) { diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java index cd015ead..918ca654 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java @@ -24,10 +24,15 @@ public class SocketManager { public static void addGroup(String userId, NioEventLoopGroup group) { socketGroup.put(userId, group); } + public static void removeUser(String userId) { Channel channel = socketSessions.get(userId); if(ObjectUtil.isNotNull(channel)){ - channel.close(); + try { + channel.close().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } NioEventLoopGroup eventExecutors = socketGroup.get(userId); if(ObjectUtil.isNotNull(channel)){ eventExecutors.shutdownGracefully(); @@ -48,7 +53,7 @@ public class SocketManager { public static void sendMsg(String userId,String msg) { Channel channel = socketSessions.get(userId); if(ObjectUtil.isNotNull(channel)){ - channel.writeAndFlush(msg); + channel.writeAndFlush(msg+'\n'); System.out.println(userId+"__"+channel.id()+"往"+channel.remoteAddress()+"发送数据:"+msg); }else{ System.out.println(userId+"__发送数据:失败通道不存在"+msg); diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index 2c24c0fe..f2f951b1 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum; +import com.njcn.gather.detection.pojo.vo.SocketDataMsg; import com.njcn.gather.detection.pojo.vo.WebSocketVO; import com.njcn.gather.detection.util.socket.SocketManager; import com.njcn.gather.detection.util.socket.WebServiceManager; @@ -34,6 +35,7 @@ public class NettyClient { Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(group) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override @@ -52,7 +54,7 @@ public class NettyClient { //空闲状态的handler // 添加LineBasedFrameDecoder来按行分割数据 .addLast(new LineBasedFrameDecoder(10240)) - // .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)) + .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(handler); @@ -66,6 +68,7 @@ public class NettyClient { System.out.println("链接服务端失败..."); } else { System.out.println("链接服务端成功..."); + System.out.println("客户端向服务端发送消息:"+msg); channelFuture.channel().writeAndFlush(msg); } }); @@ -83,11 +86,17 @@ public class NettyClient { System.out.println("连接socket服务端发送异常............" + e.getMessage()); group.shutdownGracefully(); //TODO 通知页面 - WebSocketVO webSocketVO = new WebSocketVO(); - webSocketVO.setType("aaa"); - webSocketVO.setCode(SourceResponseCodeEnum.SOCKET_ERROR.getCode()); - webSocketVO.setMessage(SourceResponseCodeEnum.SOCKET_ERROR.getMessage()); - WebServiceManager.sendMsg(userPageId, JSON.toJSONString(webSocketVO)); + SocketDataMsg socketDataMsg = new SocketDataMsg(); + socketDataMsg.setType("aaa"); + socketDataMsg.setCode(SourceResponseCodeEnum.SOCKET_ERROR.getCode()); + socketDataMsg.setData(SourceResponseCodeEnum.SOCKET_ERROR.getMessage()); + socketDataMsg.setRequestId("connect"); + if (handler instanceof NettySourceClientHandler) { + socketDataMsg.setOperateCode("Source"); + }else{ + socketDataMsg.setOperateCode("Dev"); + } + WebServiceManager.sendMsg(userPageId, JSON.toJSONString(socketDataMsg)); } finally { // System.out.println("进入clientSocket最后步骤---------------------"); } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index 2349cc09..e2249f13 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -38,7 +38,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { System.out.println("客户端通道已建立" + ctx.channel().id()); Channel channel = SocketManager.getChannelByUserId(param.getUserPageId()+"_Dev"); if(Objects.nonNull(channel)){ - channel.close(); + channel.close().sync(); } SocketManager.addUser(param.getUserPageId()+"_Dev",ctx.channel()); @@ -79,7 +79,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) { - System.out.println("有通道接入" + ctx.channel()); + System.out.println("有通道准备接入" + ctx.channel()); } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java index dd6cae8e..ac29ccdc 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettySourceClientHandler.java @@ -3,6 +3,7 @@ package com.njcn.gather.detection.util.socket.cilent; import com.njcn.gather.detection.handler.SocketSourceResponseService; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; +import com.njcn.gather.detection.pojo.vo.SocketDataMsg; import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.util.socket.MsgUtil; import com.njcn.gather.detection.util.socket.SocketManager; @@ -40,10 +41,7 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler { channel.writeAndFlush("socket指令结果:成功指令"); }*/ if(!msg.contains("HeartBeat")){ - JSONObject jsonObject1 = new JSONObject(); - jsonObject1.put("requestId","yjc_ytxjy"); - jsonObject1.put("operateCode","INIT_GATHER"); - jsonObject1.put("code","10201"); - channel.writeAndFlush(jsonObject1.toJSONString()+'\n'); + if(msg.contains("start")){ + JSONObject jsonObject1 = new JSONObject(); + jsonObject1.put("requestId","yjc_ytxjy"); + jsonObject1.put("operateCode","INIT_GATHER"); + jsonObject1.put("code","10201"); + channel.writeAndFlush(jsonObject1.toJSONString()+'\n'); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("requestId","yjc_ytxjy"); + jsonObject.put("operateCode","INIT_GATHER"); + jsonObject.put("code","10200"); + channel.writeAndFlush(jsonObject.toJSONString()+'\n'); + }else if(msg.contains("YJC_xujy")){ + JSONObject jsonObject = new JSONObject(); + jsonObject.put("requestId","YJC_xujy"); + jsonObject.put("operateCode","OPER_GATHER"); + jsonObject.put("code","10201"); + channel.writeAndFlush(jsonObject.toJSONString()+'\n'); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + JSONObject jsonObject1 = new JSONObject(); + jsonObject1.put("requestId","YJC_xujy"); + jsonObject1.put("operateCode","OPER_GATHER"); + jsonObject1.put("code","10200"); + channel.writeAndFlush(jsonObject1.toJSONString()+'\n'); + - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); } - JSONObject jsonObject = new JSONObject(); - jsonObject.put("requestId","yjc_ytxjy"); - jsonObject.put("operateCode","INIT_GATHER"); - jsonObject.put("code","10200"); - channel.writeAndFlush(jsonObject.toJSONString()+'\n'); + }