1.微调相序校验
This commit is contained in:
@@ -235,6 +235,7 @@ public class SocketDevResponseService {
|
||||
System.out.println("开始相序校验++++++++++");
|
||||
PqScriptIssueParam issueParam=new PqScriptIssueParam();
|
||||
issueParam.setPlanId(param.getPlanId());
|
||||
issueParam.setSourceId(param.getScriptId());
|
||||
issueParam.setIsPhaseSequence(true);
|
||||
issueParam.setDevIds(param.getDevIds());
|
||||
List<SourceIssue> sourceIssues = pqScriptDtlsService.listSourceIssue(issueParam);
|
||||
|
||||
@@ -46,6 +46,7 @@ public class SocketSourceResponseService {
|
||||
public void deal(PreDetectionParam param, String msg){
|
||||
SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
|
||||
SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getRequestId());
|
||||
if(ObjectUtil.isNotNull(enumByCode)){
|
||||
switch (enumByCode){
|
||||
case YJC_YTXJY:
|
||||
detectionDev(param, socketDataMsg);
|
||||
@@ -55,6 +56,10 @@ public class SocketSourceResponseService {
|
||||
break;
|
||||
|
||||
}
|
||||
}else{
|
||||
System.out.println("1");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -70,9 +75,9 @@ public class SocketSourceResponseService {
|
||||
switch (dictDataEnumByCode){
|
||||
case SUCCESS:
|
||||
//todo 前端推送收到的消息暂未处理好
|
||||
// webSocketHandler.sendMsgToUser(param.getUserPageId(), msg);
|
||||
webSocketHandler.sendMsgToUser(param.getUserPageId(), "msg");
|
||||
String s = param.getUserPageId() + "_Dev";
|
||||
//开始设备通讯检测
|
||||
//开始设备通讯检测(发送设备初始化)
|
||||
Channel channel = SocketManager.getChannelByUserId(s);
|
||||
if(channel==null || !channel.isActive()){
|
||||
NettyClient.socketClient(ip, port, param.getUserPageId(), new NettyDevClientHandler(param, socketDevResponseService));
|
||||
@@ -93,7 +98,7 @@ public class SocketSourceResponseService {
|
||||
socketMsg.setRequestId(socketDataMsg.getRequestId());
|
||||
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
|
||||
socketMsg.setData(dictDataEnumByCode.getMessage());
|
||||
// webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
|
||||
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -114,7 +119,7 @@ public class SocketSourceResponseService {
|
||||
socketMsg.setRequestId(socketDataMsg.getRequestId());
|
||||
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
|
||||
socketMsg.setData(dictDataEnumByCode.getMessage());
|
||||
// webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
|
||||
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
|
||||
|
||||
|
||||
String s = param.getUserPageId() + "_Dev";
|
||||
@@ -138,7 +143,7 @@ public class SocketSourceResponseService {
|
||||
socketMsg.setRequestId(socketDataMsg.getRequestId());
|
||||
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
|
||||
socketMsg.setData(dictDataEnumByCode.getMessage());
|
||||
// webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
|
||||
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.njcn.gather.detection.pojo.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @author caozehui
|
||||
* @data 2024/11/9
|
||||
*/
|
||||
@Getter
|
||||
public enum DetectionResponseEnum {
|
||||
PLAN_PATTERN_NOT("A020001", "计划模式查询为空"),
|
||||
SOURCE_INFO_NOT("A020002", "源表信息不存在"),
|
||||
PLAN_AND_SOURCE_NOT("A020003", "计划和源关系不存在")
|
||||
|
||||
;
|
||||
|
||||
private String code;
|
||||
private String message;
|
||||
|
||||
DetectionResponseEnum(String code, String message) {
|
||||
this.code = code;
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
@@ -27,4 +27,9 @@ public class PreDetectionParam {
|
||||
*/
|
||||
private List<String> devIds;
|
||||
|
||||
/**
|
||||
* 检测脚本Id
|
||||
*/
|
||||
private String scriptId;
|
||||
|
||||
}
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package com.njcn.gather.detection.service.impl;
|
||||
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.gather.detection.handler.SocketDevResponseService;
|
||||
import com.njcn.gather.detection.handler.SocketSourceResponseService;
|
||||
import com.njcn.gather.detection.pojo.enums.DetectionResponseEnum;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
import com.njcn.gather.detection.pojo.vo.SocketMsg;
|
||||
@@ -13,10 +17,9 @@ import com.njcn.gather.detection.service.PreDetectionService;
|
||||
|
||||
import com.njcn.gather.detection.util.socket.SocketManager;
|
||||
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
|
||||
import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler;
|
||||
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
|
||||
import com.njcn.gather.device.device.service.IPqDevService;
|
||||
import com.njcn.gather.device.script.pojo.param.PqScriptIssueParam;
|
||||
import com.njcn.gather.device.script.pojo.po.SourceIssue;
|
||||
import com.njcn.gather.device.script.service.IPqScriptDtlsService;
|
||||
import com.njcn.gather.device.source.pojo.po.SourceInitialize;
|
||||
import com.njcn.gather.device.source.service.IPqSourceService;
|
||||
@@ -31,13 +34,15 @@ import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class PreDetectionServiceImpl implements PreDetectionService {
|
||||
|
||||
private final String source= "source";
|
||||
private final String source = "source";
|
||||
|
||||
private final IPqDevService iPqDevService;
|
||||
private final IDictDataService dictDataService;
|
||||
@@ -58,7 +63,6 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
||||
private final SocketSourceResponseService sourceResponseService;
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void sourceCommunicationCheck(PreDetectionParam param) {
|
||||
/*
|
||||
@@ -75,51 +79,55 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
||||
sendYtxSocket(param);
|
||||
break;
|
||||
case CONTRAST:
|
||||
//todo 对比式可以是多个源
|
||||
sendYtxSocket(param);
|
||||
break;
|
||||
default:
|
||||
//todo 没有找到对应的模式
|
||||
break;
|
||||
throw new BusinessException(DetectionResponseEnum.PLAN_PATTERN_NOT);
|
||||
}
|
||||
|
||||
} else {
|
||||
//todo 需要向前端推送消息查不到检测计划
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
private void sendYtxSocket(PreDetectionParam param){
|
||||
private void sendYtxSocket(PreDetectionParam param) {
|
||||
AdPlanSource planSource = adPlanSourceService.getOne(new LambdaQueryWrapper<AdPlanSource>()
|
||||
.eq(AdPlanSource::getPlanId,param.getPlanId())
|
||||
.eq(AdPlanSource::getPlanId, param.getPlanId())
|
||||
);
|
||||
if(ObjectUtil.isNotNull(planSource)){
|
||||
if (ObjectUtil.isNotNull(planSource)) {
|
||||
SourceInitialize sourceParam = pqSourceService.getSourceInitializeParam(planSource.getSourceId());
|
||||
if(ObjectUtil.isNotNull(sourceParam)){
|
||||
if (ObjectUtil.isNotNull(sourceParam)) {
|
||||
//开始组装socket报文请求头
|
||||
SocketMsg msg=new SocketMsg();
|
||||
SocketMsg msg ;
|
||||
String s = param.getUserPageId() + "_Source";
|
||||
//PQC600A_192.168.1.133_6806
|
||||
Channel channel = SocketManager.getChannelByUserId(s);
|
||||
socketDevResponseService.xuClear();
|
||||
socketDevResponseService.initList(param);
|
||||
if(channel==null || !channel.isActive()){
|
||||
NettyClient.socketClient(ip, port, param.getUserPageId(), new NettySourceClientHandler(param, sourceResponseService));
|
||||
}else{
|
||||
//关闭源
|
||||
msg= new SocketMsg();
|
||||
msg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
|
||||
msg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue());
|
||||
Map<String,String> map=new HashMap<>(1);
|
||||
map.put("sourceId",sourceParam.getSourceId());
|
||||
msg.setData(JSON.toJSONString(map));
|
||||
SocketManager.sendMsg(s, JSON.toJSONString(msg));
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
msg= new SocketMsg();
|
||||
msg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
|
||||
msg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue());
|
||||
msg.setData(JSON.toJSONString(sourceParam));
|
||||
String s = param.getUserPageId() + "_Source";
|
||||
NettyClient.socketClient(ip, port,param.getUserPageId(), new NettySourceClientHandler(param, sourceResponseService));
|
||||
SocketManager.sendMsg(s,JSON.toJSONString(msg));
|
||||
|
||||
|
||||
PqScriptIssueParam issueParam=new PqScriptIssueParam();
|
||||
issueParam.setScriptId(planSource.getSourceId());
|
||||
issueParam.setPlanId(param.getPlanId());
|
||||
issueParam.setSourceId(planSource.getSourceId());
|
||||
List<SourceIssue> sourceIssues = pqScriptDtlsService.listSourceIssue(issueParam);
|
||||
for (SourceIssue sourceIssue : sourceIssues) {
|
||||
String jsonString = JSON.toJSONString(sourceIssue);
|
||||
SocketManager.sendMsg(s, JSON.toJSONString(msg));
|
||||
} else {
|
||||
throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT);
|
||||
}
|
||||
|
||||
}else{
|
||||
//todo 提示处理源表信息不存在
|
||||
}
|
||||
}else{
|
||||
//todo 提示处理计划和源关系不存在
|
||||
} else {
|
||||
throw new BusinessException(DetectionResponseEnum.PLAN_AND_SOURCE_NOT);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,17 +137,15 @@ public class PreDetectionServiceImpl implements PreDetectionService {
|
||||
|
||||
socketDevResponseService.initList(param);
|
||||
|
||||
|
||||
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
String ddId = param.getUserPageId()+"_Source";
|
||||
String ddId = param.getUserPageId() + "_Source";
|
||||
Channel channel = SocketManager.getChannelByUserId(ddId);
|
||||
if( channel== null || !channel.isActive()){
|
||||
NettyClient.socketClient(ip, port,param.getUserPageId(),new NettySourceClientHandler(param, sourceResponseService));
|
||||
if (channel == null || !channel.isActive()) {
|
||||
NettyClient.socketClient(ip, port, param.getUserPageId(), new NettySourceClientHandler(param, sourceResponseService));
|
||||
}
|
||||
SocketManager.sendMsg(ddId,"start\n");
|
||||
SocketManager.sendMsg(ddId, "start\n");
|
||||
}
|
||||
};
|
||||
runnable.run();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.njcn.gather.detection.util.socket;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
|
||||
@@ -29,8 +30,13 @@ public class SocketManager {
|
||||
|
||||
public static void sendMsg(String userId,String msg) {
|
||||
Channel channel = userSessions.get(userId);
|
||||
if(ObjectUtil.isNotNull(channel)){
|
||||
channel.writeAndFlush(msg);
|
||||
System.out.println(userId+"__"+channel.id()+"往"+channel.remoteAddress()+"发送数据:"+msg);
|
||||
}else{
|
||||
System.out.println(userId+"__发送数据:失败通道不存在"+msg);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ public class NettyClient {
|
||||
//空闲状态的handler
|
||||
// 添加LineBasedFrameDecoder来按行分割数据
|
||||
// .addLast(new LineBasedFrameDecoder(10240))
|
||||
// .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
|
||||
.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
|
||||
.addLast(new StringDecoder(CharsetUtil.UTF_8))
|
||||
.addLast(new StringEncoder(CharsetUtil.UTF_8))
|
||||
.addLast(handler);
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package com.njcn.gather.detection.util.socket.cilent;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.njcn.gather.detection.handler.SocketSourceResponseService;
|
||||
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
|
||||
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||
import com.njcn.gather.detection.pojo.vo.SocketMsg;
|
||||
import com.njcn.gather.detection.util.socket.SocketManager;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.handler.timeout.TimeoutException;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@@ -70,16 +75,18 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
//如果是空闲状态事件
|
||||
// if (evt instanceof IdleStateEvent) {
|
||||
// if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
|
||||
// System.out.println("空闲" + ctx.channel().id());
|
||||
// //发送ping 保持心跳链接
|
||||
// TextWebSocketFrame resp = new TextWebSocketFrame(ctx.channel().id() + " ping");
|
||||
// ctx.writeAndFlush(resp);
|
||||
// }
|
||||
// }else {
|
||||
// userEventTriggered(ctx,evt);
|
||||
// }
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
|
||||
//发送ping 保持心跳链接
|
||||
SocketMsg msg=new SocketMsg();
|
||||
msg.setRequestId("yxt");
|
||||
msg.setOperateCode(SourceOperateCodeEnum.HEARTBEAT.getValue());
|
||||
msg.setData("");
|
||||
ctx.writeAndFlush(JSON.toJSONString(msg));
|
||||
}
|
||||
}else {
|
||||
userEventTriggered(ctx,evt);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -123,7 +123,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
|
||||
* 推送数据至前台
|
||||
*/
|
||||
public void sendMsgToUser(String userId, String msg) {
|
||||
WebServiceManager.sendMsg(userId, msg);
|
||||
// WebServiceManager.sendMsg(userId, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user