优化部分代码
This commit is contained in:
@@ -21,6 +21,9 @@ import org.springframework.validation.annotation.Validated;
|
|||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author chendaofei
|
||||||
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Api(tags = "预检测")
|
@Api(tags = "预检测")
|
||||||
@RestController
|
@RestController
|
||||||
@@ -30,15 +33,6 @@ public class PreDetectionController extends BaseController {
|
|||||||
|
|
||||||
private final PreDetectionService preDetectionService;
|
private final PreDetectionService preDetectionService;
|
||||||
|
|
||||||
private final DetectionServiceImpl detectionServiceImpl;
|
|
||||||
|
|
||||||
private final IPqScriptDtlsService pqScriptDtlsService;
|
|
||||||
|
|
||||||
private final IPqDevService iPqDevService;
|
|
||||||
|
|
||||||
private final SocketDevResponseService socketDevResponseService;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 开始检测
|
* 开始检测
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package com.njcn.gather.detection.pojo.constant;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通信相关的常量
|
||||||
|
*
|
||||||
|
* @author hongawen
|
||||||
|
* @version 1.0
|
||||||
|
* @data 2025/4/15 14:11
|
||||||
|
*/
|
||||||
|
public interface DetectionCommunicateConstant {
|
||||||
|
|
||||||
|
String SOURCE_CHANNEL_NAME = "AUTO_DETECTION_SOURCE";
|
||||||
|
|
||||||
|
String DEVICE_CHANNEL_NAME = "AUTO_DETECTION_DEV";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -8,12 +8,14 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|||||||
import com.njcn.common.pojo.exception.BusinessException;
|
import com.njcn.common.pojo.exception.BusinessException;
|
||||||
import com.njcn.gather.detection.handler.SocketDevResponseService;
|
import com.njcn.gather.detection.handler.SocketDevResponseService;
|
||||||
import com.njcn.gather.detection.handler.SocketSourceResponseService;
|
import com.njcn.gather.detection.handler.SocketSourceResponseService;
|
||||||
|
import com.njcn.gather.detection.pojo.constant.DetectionCommunicateConstant;
|
||||||
import com.njcn.gather.detection.pojo.enums.DetectionResponseEnum;
|
import com.njcn.gather.detection.pojo.enums.DetectionResponseEnum;
|
||||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||||
import com.njcn.gather.detection.pojo.param.SimulateDetectionParam;
|
import com.njcn.gather.detection.pojo.param.SimulateDetectionParam;
|
||||||
import com.njcn.gather.detection.pojo.vo.SocketMsg;
|
import com.njcn.gather.detection.pojo.vo.SocketMsg;
|
||||||
import com.njcn.gather.detection.service.PreDetectionService;
|
import com.njcn.gather.detection.service.PreDetectionService;
|
||||||
|
import com.njcn.gather.detection.util.business.DetectionCommunicateUtil;
|
||||||
import com.njcn.gather.detection.util.socket.CnSocketUtil;
|
import com.njcn.gather.detection.util.socket.CnSocketUtil;
|
||||||
import com.njcn.gather.detection.util.socket.FormalTestManager;
|
import com.njcn.gather.detection.util.socket.FormalTestManager;
|
||||||
import com.njcn.gather.detection.util.socket.SocketManager;
|
import com.njcn.gather.detection.util.socket.SocketManager;
|
||||||
@@ -41,7 +43,6 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -52,8 +53,6 @@ import java.util.stream.Collectors;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class PreDetectionServiceImpl implements PreDetectionService {
|
public class PreDetectionServiceImpl implements PreDetectionService {
|
||||||
|
|
||||||
private final String source = "_Source";
|
|
||||||
private final String dev = "_Dev";
|
|
||||||
private final String stepTag = "&&";
|
private final String stepTag = "&&";
|
||||||
private final String handlerSourceStr = "_Source";
|
private final String handlerSourceStr = "_Source";
|
||||||
|
|
||||||
@@ -79,8 +78,9 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sourceCommunicationCheck(PreDetectionParam param) {
|
public void sourceCommunicationCheck(PreDetectionParam param) {
|
||||||
commCheck(param);
|
// 参数校验,目前仅检查IP是否重复
|
||||||
specialDeal(param);
|
checkDevIp(param);
|
||||||
|
DetectionCommunicateUtil.checkCommunicateChannel(param);
|
||||||
/*
|
/*
|
||||||
先组装源通讯协议
|
先组装源通讯协议
|
||||||
查询计划什么模式的(除了对比式,其他都是一个计划对应一个源)
|
查询计划什么模式的(除了对比式,其他都是一个计划对应一个源)
|
||||||
@@ -106,7 +106,8 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void coefficientCheck(PreDetectionParam param) {
|
public void coefficientCheck(PreDetectionParam param) {
|
||||||
specialDeal(param);
|
// 检测是否存在连接的通道,后期需要做成动态,如果组合中不是第一位,则不需要关闭,也不用初始化 todo....
|
||||||
|
DetectionCommunicateUtil.checkCommunicateChannel(param);
|
||||||
AdPlanSource planSource = adPlanSourceService.getOne(new LambdaQueryWrapper<AdPlanSource>()
|
AdPlanSource planSource = adPlanSourceService.getOne(new LambdaQueryWrapper<AdPlanSource>()
|
||||||
.eq(AdPlanSource::getPlanId, param.getPlanId())
|
.eq(AdPlanSource::getPlanId, param.getPlanId())
|
||||||
);
|
);
|
||||||
@@ -189,7 +190,7 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
|||||||
xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue());
|
xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue());
|
||||||
xuMsg.setData(JSON.toJSONString(sourceIssues));
|
xuMsg.setData(JSON.toJSONString(sourceIssues));
|
||||||
xuMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue() + "&&" + sourceIssues.getType());
|
xuMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue() + "&&" + sourceIssues.getType());
|
||||||
SocketManager.sendMsg(param.getUserPageId() + source, JSON.toJSONString(xuMsg));
|
SocketManager.sendMsg(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME, JSON.toJSONString(xuMsg));
|
||||||
} else {
|
} else {
|
||||||
//TODO 是否最终检测完成需要推送给用户
|
//TODO 是否最终检测完成需要推送给用户
|
||||||
PqScriptCheckDataParam checkDataParam = new PqScriptCheckDataParam();
|
PqScriptCheckDataParam checkDataParam = new PqScriptCheckDataParam();
|
||||||
@@ -211,13 +212,13 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
|||||||
preDetectionParam.setUserPageId(param.getUserPageId());
|
preDetectionParam.setUserPageId(param.getUserPageId());
|
||||||
preDetectionParam.setSendWebMsg(true);
|
preDetectionParam.setSendWebMsg(true);
|
||||||
|
|
||||||
specialDealSimulate(preDetectionParam);
|
DetectionCommunicateUtil.checkCommunicateChannel(preDetectionParam);
|
||||||
sendYtxSocketSimulate(preDetectionParam);
|
sendYtxSocketSimulate(preDetectionParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendScript(SimulateDetectionParam param) {
|
public void sendScript(SimulateDetectionParam param) {
|
||||||
Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source);
|
Channel channel = SocketManager.getChannelByUserId(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME);
|
||||||
if (Objects.isNull(channel) || !channel.isActive()) {
|
if (Objects.isNull(channel) || !channel.isActive()) {
|
||||||
// 进行源通信连接
|
// 进行源通信连接
|
||||||
PreDetectionParam preDetectionParam = new PreDetectionParam();
|
PreDetectionParam preDetectionParam = new PreDetectionParam();
|
||||||
@@ -251,7 +252,7 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeTestSimulate(SimulateDetectionParam param) {
|
public void closeTestSimulate(SimulateDetectionParam param) {
|
||||||
Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source);
|
Channel channel = SocketManager.getChannelByUserId(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME);
|
||||||
if (Objects.isNull(channel) || !channel.isActive()) {
|
if (Objects.isNull(channel) || !channel.isActive()) {
|
||||||
throw new BusinessException(DetectionResponseEnum.SOURCE_NOT_CONNECT);
|
throw new BusinessException(DetectionResponseEnum.SOURCE_NOT_CONNECT);
|
||||||
}
|
}
|
||||||
@@ -260,72 +261,14 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
|||||||
preDetectionParam.setSourceId(sourceParam.getSourceId());
|
preDetectionParam.setSourceId(sourceParam.getSourceId());
|
||||||
preDetectionParam.setUserPageId(param.getUserPageId());
|
preDetectionParam.setUserPageId(param.getUserPageId());
|
||||||
CnSocketUtil.quitSendSource(preDetectionParam);
|
CnSocketUtil.quitSendSource(preDetectionParam);
|
||||||
|
|
||||||
WebServiceManager.removePreDetectionParam();
|
WebServiceManager.removePreDetectionParam();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 对重复发起或者异常发起的检测进行关闭源操作
|
|
||||||
*
|
|
||||||
* @param param
|
|
||||||
*/
|
|
||||||
private void specialDeal(PreDetectionParam param) {
|
|
||||||
Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source);
|
|
||||||
Channel channelDev = SocketManager.getChannelByUserId(param.getUserPageId() + dev);
|
|
||||||
if (Objects.nonNull(channel) && channel.isActive()) {
|
|
||||||
System.out.println("发送关闭源指令。。。。。。。。");
|
|
||||||
CnSocketUtil.quitSendSource(param);
|
|
||||||
}
|
|
||||||
if (Objects.nonNull(channelDev) && channelDev.isActive()) {
|
|
||||||
System.out.println("发送关闭设备通讯指令。。。。。。。。");
|
|
||||||
CnSocketUtil.quitSend(param);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Thread.sleep(4000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error(e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
SocketManager.removeUser(param.getUserPageId() + source);
|
|
||||||
SocketManager.removeUser(param.getUserPageId() + dev);
|
|
||||||
|
|
||||||
try {
|
|
||||||
Thread.sleep(2000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error(e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 对重复发起或者异常发起的检测进行关闭源操作-模拟检测
|
* 校验被检设备中是否存在IP重复的
|
||||||
*
|
|
||||||
* @param param
|
|
||||||
*/
|
*/
|
||||||
private void specialDealSimulate(PreDetectionParam param) {
|
private void checkDevIp(PreDetectionParam param) {
|
||||||
Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + source);
|
|
||||||
if (Objects.nonNull(channel) && channel.isActive()) {
|
|
||||||
System.out.println("发送关闭源指令。。。。。。。。");
|
|
||||||
CnSocketUtil.quitSendSource(param);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(4000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error(e.getMessage());
|
|
||||||
}
|
|
||||||
SocketManager.removeUser(param.getUserPageId() + source);
|
|
||||||
try {
|
|
||||||
Thread.sleep(2000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error(e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 校验
|
|
||||||
*/
|
|
||||||
private void commCheck(PreDetectionParam param) {
|
|
||||||
List<PqDev> pqDevList = iPqDevService.listByIds(param.getDevIds());
|
List<PqDev> pqDevList = iPqDevService.listByIds(param.getDevIds());
|
||||||
List<String> ipList = pqDevList.stream().map(PqDev::getIp).distinct().collect(Collectors.toList());
|
List<String> ipList = pqDevList.stream().map(PqDev::getIp).distinct().collect(Collectors.toList());
|
||||||
if (ipList.size() != param.getDevIds().size()) {
|
if (ipList.size() != param.getDevIds().size()) {
|
||||||
|
|||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package com.njcn.gather.detection.util.business;
|
||||||
|
|
||||||
|
import com.njcn.gather.detection.pojo.constant.DetectionCommunicateConstant;
|
||||||
|
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||||
|
import com.njcn.gather.detection.util.socket.CnSocketUtil;
|
||||||
|
import com.njcn.gather.detection.util.socket.SocketManager;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* 检测通讯工具类
|
||||||
|
*
|
||||||
|
* @author hongawen
|
||||||
|
* @version 1.0
|
||||||
|
* @data 2025/4/15 15:24
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class DetectionCommunicateUtil {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检测是否存在连接的源、设备通讯的模块通道
|
||||||
|
* 有则强行关闭
|
||||||
|
*/
|
||||||
|
public static void checkCommunicateChannel(PreDetectionParam param) {
|
||||||
|
Channel channelSource = SocketManager.getChannelByUserId(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME);
|
||||||
|
Channel channelDev = SocketManager.getChannelByUserId(DetectionCommunicateConstant.DEVICE_CHANNEL_NAME);
|
||||||
|
if (Objects.nonNull(channelSource) && channelSource.isActive()) {
|
||||||
|
System.out.println("发送关闭源指令。。。。。。。。");
|
||||||
|
CnSocketUtil.quitSendSource(param);
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(channelDev) && channelDev.isActive()) {
|
||||||
|
System.out.println("发送关闭设备通讯指令。。。。。。。。");
|
||||||
|
CnSocketUtil.quitSend(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(4000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
SocketManager.removeUser(DetectionCommunicateConstant.SOURCE_CHANNEL_NAME);
|
||||||
|
SocketManager.removeUser(DetectionCommunicateConstant.DEVICE_CHANNEL_NAME);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(2000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.njcn.gather.detection.util.socket;
|
package com.njcn.gather.detection.util.socket;
|
||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import com.njcn.gather.device.pojo.vo.PreDetection;
|
|
||||||
import com.njcn.gather.script.pojo.po.SourceIssue;
|
import com.njcn.gather.script.pojo.po.SourceIssue;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
@@ -19,10 +18,14 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||||||
*/
|
*/
|
||||||
public class SocketManager {
|
public class SocketManager {
|
||||||
|
|
||||||
// key为userId(xxx_Source、xxx_Dev),value为channel
|
/**
|
||||||
|
* key为userId(xxx_Source、xxx_Dev),value为channel
|
||||||
|
*/
|
||||||
private static final Map<String, Channel> socketSessions = new ConcurrentHashMap<>();
|
private static final Map<String, Channel> socketSessions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// key为userId(xxx_Source、xxx_Dev),value为group
|
/**
|
||||||
|
* key为userId(xxx_Source、xxx_Dev),value为group
|
||||||
|
*/
|
||||||
private static final Map<String, NioEventLoopGroup> socketGroup = new ConcurrentHashMap<>();
|
private static final Map<String, NioEventLoopGroup> socketGroup = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public static void addUser(String userId, Channel channel) {
|
public static void addUser(String userId, Channel channel) {
|
||||||
|
|||||||
Reference in New Issue
Block a user