This commit is contained in:
2024-12-27 15:02:49 +08:00
parent 6c9487a1e6
commit 338e426017
3 changed files with 108 additions and 56 deletions

View File

@@ -10,9 +10,11 @@ import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.po.DevData;
import com.njcn.gather.detection.pojo.po.SourceCompareDev;
import com.njcn.gather.detection.pojo.vo.DevLineTestResult;
import com.njcn.gather.detection.pojo.vo.SocketDataMsg;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import com.njcn.gather.detection.service.impl.DetectionServiceImpl;
import com.njcn.gather.detection.util.socket.MsgUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.web.WebSocketHandler;
@@ -22,6 +24,7 @@ import com.njcn.gather.device.script.pojo.param.PqScriptIssueParam;
import com.njcn.gather.device.script.pojo.po.SourceIssue;
import com.njcn.gather.device.script.service.IPqScriptDtlsService;
import com.njcn.gather.storage.pojo.po.AdNonHarmonicResult;
import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@@ -43,6 +46,7 @@ public class SocketDevResponseService {
private final IPqDevService iPqDevService;
private final IPqScriptDtlsService scriptDtlsService;
private final IPqScriptDtlsService pqScriptDtlsService;
private final DetectionServiceImpl detectionServiceImpl;
/**
* 存储的装置相序数据
@@ -60,6 +64,8 @@ public class SocketDevResponseService {
*/
List<String> monitorIdListComm = new ArrayList<>();
private List<PreDetection> devList = new ArrayList<>();
/**
* 装置名称
*/
@@ -82,7 +88,9 @@ public class SocketDevResponseService {
public void deal(PreDetectionParam param, String msg) {
SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
SourceOperateCodeEnum sourceOperateCodeEnum = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getRequestId());
String[] tem = socketDataMsg.getRequestId().split("&&");
SourceOperateCodeEnum sourceOperateCodeEnum = SourceOperateCodeEnum.getDictDataEnumByCode(tem[0]);
switch (sourceOperateCodeEnum) {
//设备通讯校验
case YJC_SBTXJY:
@@ -116,7 +124,7 @@ public class SocketDevResponseService {
private void devComm(SocketDataMsg socketDataMsg, PreDetectionParam param, String msg) {
SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
String s = param.getUserPageId() + handlerStr;
SocketMsg socketMsg = new SocketMsg();
SocketMsg<String> socketMsg = new SocketMsg<>();
switch (Objects.requireNonNull(dictDataEnumByCode)) {
case SUCCESS:
//通讯校验成功
@@ -156,22 +164,22 @@ public class SocketDevResponseService {
case DEV_ERROR:
webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2));
quitSend(param, socketMsg);
quitSendSource(param, socketMsg);
quitSend(param);
quitSendSource(param);
break;
case DEV_TARGET:
webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2));
quitSend(param, socketMsg);
quitSendSource(param, socketMsg);
quitSend(param);
quitSendSource(param);
break;
case RE_OPERATE:
//出现已经初始化情况,发送用户用户确认是否继续检测
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
quitSend(param, socketMsg);
quitSend(param);
break;
case NO_INIT_DEV:
//发起关闭操作
quitSend(param, socketMsg);
quitSend(param);
break;
default:
WebSocketVO webSocketVO = new WebSocketVO();
@@ -194,12 +202,9 @@ public class SocketDevResponseService {
webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2));
if (socketDataMsg.getOperateCode().equals(SourceOperateCodeEnum.DEV_INIT_GATHER_02.getValue())) {
successXieyi.add(socketDataMsg.getData());
if (successXieyi.size() == monitorIdListComm.size()) {
for (String string : successXieyi) {
//模拟统计协议校验,通讯校验已经校验过,模拟直接推送
SocketDataMsg webSend = new SocketDataMsg();
@@ -211,7 +216,7 @@ public class SocketDevResponseService {
}
//协议3校验
successXieyi = new ArrayList<>();
//successXieyi = new ArrayList<>();
List<PreDetection> devList = iPqDevService.getDevInfo(param.getDevIds());
Map<String, List<PreDetection>> map = new HashMap(1);
@@ -244,23 +249,28 @@ public class SocketDevResponseService {
PqScriptIssueParam issueParam = new PqScriptIssueParam();
issueParam.setPlanId(param.getPlanId());
issueParam.setSourceId(param.getSourceId());
issueParam.setIsPhaseSequence(true);
issueParam.setDevIds(param.getDevIds());
List<SourceIssue> sourceIssues = pqScriptDtlsService.listSourceIssue(issueParam);
//issueParam.setScriptId("2973cb938b591b93d0df2547592b8cd8");
if (CollUtil.isNotEmpty(sourceIssues)) {
SocketMsg xuMsg = new SocketMsg();
xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue());
xuMsg.setData(JSON.toJSONString(sourceIssues.get(0)));
if (SourceOperateCodeEnum.FORMAL_TEST.getValue().equals(param.getOperateType())) {
//正式检测
xuMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue());
issueParam.setIsPhaseSequence(true);
xuMsg.setRequestId(SourceOperateCodeEnum.Test_VOL.getValue());
} else if (SourceOperateCodeEnum.PRE_TEST.getValue().equals(param.getOperateType())) {
//预检测
issueParam.setIsPhaseSequence(false);
xuMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue());
}
List<SourceIssue> sourceIssues = pqScriptDtlsService.listSourceIssue(issueParam);
if (CollUtil.isNotEmpty(sourceIssues)) {
if(SourceOperateCodeEnum.FORMAL_TEST.getValue().equals(param.getOperateType())){
SocketManager.addSourceTarget(SourceOperateCodeEnum.Test_VOL.getValue(), sourceIssues.get(0));
}
xuMsg.setData(JSON.toJSONString(sourceIssues.get(0)));
SocketManager.sendMsg(param.getUserPageId() + handlerSourceStr, MsgUtil.toJsonWithNewLinePlain(xuMsg));
successComm.clear();
}
}
}
@@ -269,21 +279,22 @@ public class SocketDevResponseService {
break;
case UNPROCESSED_BUSINESS:
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
break;
case DEV_ERROR:
webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2));
quitSend(param, socketMsg);
quitSendSource(param, socketMsg);
quitSend(param);
quitSendSource(param);
break;
case DEV_TARGET:
webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2));
quitSend(param, socketMsg);
quitSendSource(param, socketMsg);
quitSend(param);
quitSendSource(param);
break;
case RE_OPERATE:
//出现已经初始化情况,发送用户用户确认是否继续检测
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketDataMsg));
quitSend(param, socketMsg);
quitSend(param);
break;
default:
WebSocketVO webSocketVO = new WebSocketVO();
@@ -336,6 +347,7 @@ public class SocketDevResponseService {
String userSource = param.getUserPageId() + handlerSourceStr;
SocketMsg msg = new SocketMsg();
msg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue());
msg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue());
Map<String, String> map = new HashMap<>(1);
@@ -345,7 +357,7 @@ public class SocketDevResponseService {
//同时关闭设备三个步骤
SocketMsg quitDevMsg = new SocketMsg();
quitSend(param, quitDevMsg);
quitSend(param);
//向前端推送消息
SocketDataMsg temMsg = new SocketDataMsg();
@@ -363,6 +375,7 @@ public class SocketDevResponseService {
}
break;
case UNPROCESSED_BUSINESS:
break;
case NORMAL_RESPONSE:
@@ -370,13 +383,13 @@ public class SocketDevResponseService {
break;
case DEV_ERROR:
webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2));
quitSend(param, socketMsg);
quitSendSource(param, socketMsg);
quitSend(param);
quitSendSource(param);
break;
case DEV_TARGET:
webSocketHandler.sendMsgToUser(param.getUserPageId(), MsgUtil.msgToWebData(socketDataMsg, devNameMapComm, 2));
quitSend(param, socketMsg);
quitSendSource(param, socketMsg);
quitSend(param);
quitSendSource(param);
break;
case RE_OPERATE:
break;
@@ -395,6 +408,8 @@ public class SocketDevResponseService {
/**
* 实时数据
*/
private final List<DevData> devDataList = new ArrayList();
public void realDeal(PreDetectionParam param, SocketDataMsg socketDataMsg) {
String data = socketDataMsg.getData();
DevData devData = JSON.parseObject(data, DevData.class);
@@ -403,17 +418,51 @@ public class SocketDevResponseService {
SocketMsg socketMsg = new SocketMsg();
switch (dictDataEnumByCode) {
case SUCCESS:
List<DevData.SqlDataDTO> sqlDataDTOList = devData.getSqlData();
for (DevData.SqlDataDTO item : sqlDataDTOList) {
//List<DevData.SqlDataDTO> sqlDataDTOList = devData.getSqlData();
devDataList.add(devData);
successComm.add(socketDataMsg.getRequestId());
System.out.println("devData............."+devDataList.size());
if (successComm.size() == monitorIdListComm.size()) {
SourceIssue sourceIssue = SocketManager.getSourceTarget(socketDataMsg.getRequestId());
System.out.println("获取sourceIssue"+sourceIssue);
Map<String, Integer> flag = detectionServiceImpl.text(devDataList, sourceIssue, DictDataEnum.AT_WILL_VALUE);
System.out.println(flag);
WebSocketVO<List<DevLineTestResult>> webSocketVO = new WebSocketVO<>();
webSocketVO.setRequestId(socketDataMsg.getRequestId().split("&&")[1]+"_End");
//组装实体推送给前台
List<DevLineTestResult> devListRes = new ArrayList<>();
devList.forEach(dev->{
DevLineTestResult devLineTestResult = new DevLineTestResult();
devLineTestResult.setDeviceId(dev.getDevId());
devLineTestResult.setDeviceName(dev.getDevName());
List<Integer> tt = new ArrayList<>();
List<PreDetection.MonitorListDTO> monitorListDTOList = dev.getMonitorList();
for(PreDetection.MonitorListDTO point : monitorListDTOList){
Integer resultFlag = flag.get(dev.getDevIP()+"_"+point.getLine());
tt.add(resultFlag);
}
devLineTestResult.setChnResult(tt.toArray(new Integer[monitorListDTOList.size()]));
devListRes.add(devLineTestResult);
});
webSocketVO.setData(devListRes);
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(webSocketVO));
quitSend(param);
quitSendSource(param);
}
AdNonHarmonicResult adNonHarmonicResult = new AdNonHarmonicResult();
break;
case UNPROCESSED_BUSINESS:
break;
case NORMAL_RESPONSE:
devDataList.add(devData);
break;
case DEV_ERROR:
@@ -451,6 +500,7 @@ public class SocketDevResponseService {
case QUIT_INIT_01:
//关闭所有
SocketManager.removeUser(s);
quitSendSource(param);
break;
case QUIT_INIT_02:
socketMsg.setRequestId("quit");
@@ -469,6 +519,8 @@ public class SocketDevResponseService {
case NO_INIT_DEV:
switch (operateCodeEnum) {
case QUIT_INIT_01:
SocketManager.removeUser(s);
quitSendSource(param);
break;
case QUIT_INIT_02:
socketMsg.setRequestId("quit");
@@ -493,7 +545,8 @@ public class SocketDevResponseService {
/**
* 退出检测
*/
private void quitSend(PreDetectionParam param, SocketMsg socketMsg) {
private void quitSend(PreDetectionParam param) {
SocketMsg socketMsg = new SocketMsg();
socketMsg.setRequestId(SourceOperateCodeEnum.QUITE.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.QUIT_INIT_03.getValue());
SocketManager.sendMsg(param.getUserPageId() + handlerStr, JSON.toJSONString(socketMsg));
@@ -502,7 +555,8 @@ public class SocketDevResponseService {
/**
* 关闭源连接
*/
private void quitSendSource(PreDetectionParam param, SocketMsg socketMsg) {
private void quitSendSource(PreDetectionParam param) {
SocketMsg socketMsg = new SocketMsg();
socketMsg.setRequestId(SourceOperateCodeEnum.QUITE_SOURCE.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue());
SocketManager.sendMsg(param.getUserPageId() + handlerSourceStr, JSON.toJSONString(socketMsg));
@@ -612,10 +666,12 @@ public class SocketDevResponseService {
this.success = new ArrayList<>();
this.devDataMap = new HashMap<>();
adNonHarmonicResultList.clear();
this.adNonHarmonicResultList.clear();
this.devDataList.clear();
this.devList.clear();
List<PreDetection> pqDevList = iPqDevService.getDevInfo(param.getDevIds());
this.devList = pqDevList;
this.monitorIdListComm = pqDevList.stream().flatMap(x -> x.getMonitorList().stream())
.map(PreDetection.MonitorListDTO::getLineId)
.collect(Collectors.toList());

View File

@@ -11,20 +11,14 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class WebSocketVO<T> {
private String type;
private String type = "aaa";
private Integer code;
private String requestId;
private String message;
private String operateCode;
private String code;
private T data;
public WebSocketVO(SourceResponseCodeEnum sourceResponseCodeEnum,String type){
this.type = type;
this.code = sourceResponseCodeEnum.getCode();
this.message= sourceResponseCodeEnum.getMessage();
}
}

View File

@@ -24,6 +24,8 @@ import java.util.Objects;
@RequiredArgsConstructor
public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
private final String dev = "_Dev";
private final PreDetectionParam param;
private final SocketDevResponseService socketResponseService;
@@ -36,12 +38,11 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端通道已建立" + ctx.channel().id());
Channel channel = SocketManager.getChannelByUserId(param.getUserPageId()+"_Dev");
Channel channel = SocketManager.getChannelByUserId(param.getUserPageId()+dev);
if(Objects.nonNull(channel)){
channel.close().sync();
}
SocketManager.addUser(param.getUserPageId()+"_Dev",ctx.channel());
SocketManager.addUser(param.getUserPageId()+dev,ctx.channel());
}
/**
@@ -61,7 +62,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
//System.out.println("客户端断线");
System.out.println("客户端断线");
//SocketManager.addUser(webUser,ctx.channel());
}
@@ -73,6 +74,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
System.out.println("进入超时。。。。。。");
//当连接超过10S和发送消息后10S无响应时候关闭channel
}