diff --git a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java index 49f6001f..f6794595 100644 --- a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java +++ b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java @@ -21,6 +21,9 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; +/** + * @author chendaofei + */ @Slf4j @Api(tags = "预检测") @RestController @@ -30,15 +33,6 @@ public class PreDetectionController extends BaseController { private final PreDetectionService preDetectionService; - private final DetectionServiceImpl detectionServiceImpl; - - private final IPqScriptDtlsService pqScriptDtlsService; - - private final IPqDevService iPqDevService; - - private final SocketDevResponseService socketDevResponseService; - - /** * 开始检测 */ diff --git a/detection/src/main/java/com/njcn/gather/detection/pojo/constant/DetectionCommunicateConstant.java b/detection/src/main/java/com/njcn/gather/detection/pojo/constant/DetectionCommunicateConstant.java new file mode 100644 index 00000000..0420724d --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/pojo/constant/DetectionCommunicateConstant.java @@ -0,0 +1,19 @@ +package com.njcn.gather.detection.pojo.constant; + +/** + * 通信相关的常量 + * + * @author hongawen + * @version 1.0 + * @data 2025/4/15 14:11 + */ +public interface DetectionCommunicateConstant { + + String SOURCE_CHANNEL_NAME = "AUTO_DETECTION_SOURCE"; + + String DEVICE_CHANNEL_NAME = "AUTO_DETECTION_DEV"; + + + + +} diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 22449c12..f77f2f76 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -8,12 +8,14 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.gather.detection.handler.SocketDevResponseService; import com.njcn.gather.detection.handler.SocketSourceResponseService; +import com.njcn.gather.detection.pojo.constant.DetectionCommunicateConstant; import com.njcn.gather.detection.pojo.enums.DetectionResponseEnum; import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum; import com.njcn.gather.detection.pojo.param.PreDetectionParam; import com.njcn.gather.detection.pojo.param.SimulateDetectionParam; import com.njcn.gather.detection.pojo.vo.SocketMsg; import com.njcn.gather.detection.service.PreDetectionService; +import com.njcn.gather.detection.util.business.DetectionCommunicateUtil; import com.njcn.gather.detection.util.socket.CnSocketUtil; import com.njcn.gather.detection.util.socket.FormalTestManager; import com.njcn.gather.detection.util.socket.SocketManager; @@ -41,7 +43,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -52,8 +53,6 @@ import java.util.stream.Collectors; @Slf4j public class PreDetectionServiceImpl implements PreDetectionService { - private final String source = "_Source"; - private final String dev = "_Dev"; private final String stepTag = "&&"; private final String handlerSourceStr = "_Source"; @@ -79,8 +78,9 @@ public class PreDetectionServiceImpl implements PreDetectionService { @Override public void sourceCommunicationCheck(PreDetectionParam param) { - commCheck(param); - specialDeal(param); + // 参数校验,目前仅检查IP是否重复 + checkDevIp(param); + DetectionCommunicateUtil.checkCommunicateChannel(param); /* 先组装源通讯协议 查询计划什么模式的(除了对比式,其他都是一个计划对应一个源) @@ -106,7 +106,8 @@ public class PreDetectionServiceImpl implements PreDetectionService { @Override public void coefficientCheck(PreDetectionParam param) { - specialDeal(param); + // 检测是否存在连接的通道,后期需要做成动态,如果组合中不是第一位,则不需要关闭,也不用初始化 todo.... + DetectionCommunicateUtil.checkCommunicateChannel(param); AdPlanSource planSource = adPlanSourceService.getOne(new LambdaQueryWrapper() .eq(AdPlanSource::getPlanId, param.getPlanId()) ); @@ -189,7 +190,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); xuMsg.setData(JSON.toJSONString(sourceIssues)); xuMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue() + "&&" + sourceIssues.getType()); - SocketManager.sendMsg(param.getUserPageId() + source, JSON.toJSONString(xuMsg)); + SocketManager.sendMsg(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME, JSON.toJSONString(xuMsg)); } else { //TODO 是否最终检测完成需要推送给用户 PqScriptCheckDataParam checkDataParam = new PqScriptCheckDataParam(); @@ -211,13 +212,13 @@ public class PreDetectionServiceImpl implements PreDetectionService { preDetectionParam.setUserPageId(param.getUserPageId()); preDetectionParam.setSendWebMsg(true); - specialDealSimulate(preDetectionParam); + DetectionCommunicateUtil.checkCommunicateChannel(preDetectionParam); sendYtxSocketSimulate(preDetectionParam); } @Override public void sendScript(SimulateDetectionParam param) { - Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source); + Channel channel = SocketManager.getChannelByUserId(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME); if (Objects.isNull(channel) || !channel.isActive()) { // 进行源通信连接 PreDetectionParam preDetectionParam = new PreDetectionParam(); @@ -251,7 +252,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { @Override public void closeTestSimulate(SimulateDetectionParam param) { - Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source); + Channel channel = SocketManager.getChannelByUserId(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME); if (Objects.isNull(channel) || !channel.isActive()) { throw new BusinessException(DetectionResponseEnum.SOURCE_NOT_CONNECT); } @@ -260,72 +261,14 @@ public class PreDetectionServiceImpl implements PreDetectionService { preDetectionParam.setSourceId(sourceParam.getSourceId()); preDetectionParam.setUserPageId(param.getUserPageId()); CnSocketUtil.quitSendSource(preDetectionParam); - WebServiceManager.removePreDetectionParam(); } - /** - * 对重复发起或者异常发起的检测进行关闭源操作 - * - * @param param - */ - private void specialDeal(PreDetectionParam param) { - Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source); - Channel channelDev = SocketManager.getChannelByUserId(param.getUserPageId() + dev); - if (Objects.nonNull(channel) && channel.isActive()) { - System.out.println("发送关闭源指令。。。。。。。。"); - CnSocketUtil.quitSendSource(param); - } - if (Objects.nonNull(channelDev) && channelDev.isActive()) { - System.out.println("发送关闭设备通讯指令。。。。。。。。"); - CnSocketUtil.quitSend(param); - } - - try { - Thread.sleep(4000); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - - SocketManager.removeUser(param.getUserPageId() + source); - SocketManager.removeUser(param.getUserPageId() + dev); - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - } /** - * 对重复发起或者异常发起的检测进行关闭源操作-模拟检测 - * - * @param param + * 校验被检设备中是否存在IP重复的 */ - private void specialDealSimulate(PreDetectionParam param) { - Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source); - if (Objects.nonNull(channel) && channel.isActive()) { - System.out.println("发送关闭源指令。。。。。。。。"); - CnSocketUtil.quitSendSource(param); - } - try { - Thread.sleep(4000); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - SocketManager.removeUser(param.getUserPageId() + source); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - } - - - /** - * 校验 - */ - private void commCheck(PreDetectionParam param) { + private void checkDevIp(PreDetectionParam param) { List pqDevList = iPqDevService.listByIds(param.getDevIds()); List ipList = pqDevList.stream().map(PqDev::getIp).distinct().collect(Collectors.toList()); if (ipList.size() != param.getDevIds().size()) { diff --git a/detection/src/main/java/com/njcn/gather/detection/util/business/DetectionCommunicateUtil.java b/detection/src/main/java/com/njcn/gather/detection/util/business/DetectionCommunicateUtil.java new file mode 100644 index 00000000..82057c7b --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/util/business/DetectionCommunicateUtil.java @@ -0,0 +1,58 @@ +package com.njcn.gather.detection.util.business; + +import com.njcn.gather.detection.pojo.constant.DetectionCommunicateConstant; +import com.njcn.gather.detection.pojo.param.PreDetectionParam; +import com.njcn.gather.detection.util.socket.CnSocketUtil; +import com.njcn.gather.detection.util.socket.SocketManager; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; + +/** + * + * 检测通讯工具类 + * + * @author hongawen + * @version 1.0 + * @data 2025/4/15 15:24 + */ +@Slf4j +public class DetectionCommunicateUtil { + + + /** + * 检测是否存在连接的源、设备通讯的模块通道 + * 有则强行关闭 + */ + public static void checkCommunicateChannel(PreDetectionParam param) { + Channel channelSource = SocketManager.getChannelByUserId(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME); + Channel channelDev = SocketManager.getChannelByUserId(DetectionCommunicateConstant.DEVICE_CHANNEL_NAME); + if (Objects.nonNull(channelSource) && channelSource.isActive()) { + System.out.println("发送关闭源指令。。。。。。。。"); + CnSocketUtil.quitSendSource(param); + } + if (Objects.nonNull(channelDev) && channelDev.isActive()) { + System.out.println("发送关闭设备通讯指令。。。。。。。。"); + CnSocketUtil.quitSend(param); + } + + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + + SocketManager.removeUser(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME); + SocketManager.removeUser(DetectionCommunicateConstant.DEVICE_CHANNEL_NAME); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + } + + + +} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java index 2b76998f..d3421a28 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/SocketManager.java @@ -1,7 +1,6 @@ package com.njcn.gather.detection.util.socket; import cn.hutool.core.util.ObjectUtil; -import com.njcn.gather.device.pojo.vo.PreDetection; import com.njcn.gather.script.pojo.po.SourceIssue; import io.netty.channel.Channel; import io.netty.channel.nio.NioEventLoopGroup; @@ -19,10 +18,14 @@ import java.util.concurrent.CopyOnWriteArrayList; */ public class SocketManager { - // key为userId(xxx_Source、xxx_Dev),value为channel + /** + * key为userId(xxx_Source、xxx_Dev),value为channel + */ private static final Map socketSessions = new ConcurrentHashMap<>(); - // key为userId(xxx_Source、xxx_Dev),value为group + /** + * key为userId(xxx_Source、xxx_Dev),value为group + */ private static final Map socketGroup = new ConcurrentHashMap<>(); public static void addUser(String userId, Channel channel) {