1.源装置,入参和发送指令业务编写

This commit is contained in:
wr
2024-12-17 19:22:54 +08:00
parent 861f947499
commit dbc07b2cf3
18 changed files with 315 additions and 169 deletions

View File

@@ -12,10 +12,8 @@ import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.Get;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@Slf4j
@Api(tags = "预检测")
@@ -41,4 +39,18 @@ public class PreDetectionController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
/**
* 开始检测
*/
@PostMapping("/startTest2")
@OperateInfo
@ApiOperation("开始检测")
@ApiImplicitParam(name = "param", value = "查询参数", required = true)
public HttpResult<String> startTest2(@RequestBody PreDetectionParam param){
String methodDescribe = getMethodDescribe("startTest");
preDetectionService.sourceCommunicationCheck(param);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -1,8 +1,18 @@
package com.njcn.gather.detection.handler;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum;
import com.njcn.gather.detection.pojo.vo.SocketDataMsg;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.util.socket.MsgUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler;
import com.njcn.gather.detection.util.socket.web.WebSocketHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@@ -14,12 +24,39 @@ public class SocketSourceResponseService {
*/
private final WebSocketHandler webSocketHandler;
private final SocketResponseService sourceResponseService;
@Value("${socket.device.ip}")
private String ip;
@Value("${socket.device.port}")
private Integer port;
public void deal(String userId,String msg){
SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
if(SourceOperateCodeEnum.YJC_YTXJY.getValue().equals(socketDataMsg.getRequestId())){
SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
if(ObjectUtil.isNotNull(dictDataEnumByCode)){
switch (dictDataEnumByCode){
case SUCCESS:
webSocketHandler.sendMsgToUser(userId,msg);
String s = userId + "_Dev";
//todo 创建终端socket连接
NettyClient.socketClient(ip, port, s, new NettyDevClientHandler(s, sourceResponseService));
SocketManager.sendMsg(s,"向127服务器发送");
break;
case UNPROCESSED_BUSINESS:
break;
default:
SocketMsg socketMsg=new SocketMsg();
socketMsg.setRequestId(socketDataMsg.getRequestId());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setData(dictDataEnumByCode.getMessage());
webSocketHandler.sendMsgToUser(userId, JSON.toJSONString(socketMsg));
break;
}
}
System.out.println("进入deal+++++++++++++++++++");
webSocketHandler.sendMsgToUser(userId,msg);
SocketManager.getChannelByUserId(userId).close();
}
}

View File

@@ -0,0 +1,32 @@
package com.njcn.gather.detection.pojo.enums;
import lombok.Getter;
/**
* @Description:
* @Author: wr
* @Date: 2024/12/17 15:37
*/
@Getter
public enum SourceOperateCodeEnum {
/**
* 源状态
*/
INIT_GATHER("INIT_GATHER", "源初始化"),
OPER_GATHER("OPER_GATHER", "源输出"),
CLOSE_GATHER("CLOSE_GATHER", "源停止"),
YJC_YTXJY("yjc_ytxjy", "预检测_源通讯检测");
private String value;
private String msg;
SourceOperateCodeEnum(String value, String msg) {
this.value = value;
this.msg = msg;
}
}

View File

@@ -0,0 +1,53 @@
package com.njcn.gather.detection.pojo.enums;
import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;
/**
* @Author: wr
* @Date: 2024/12/17 15:37
*/
@Getter
public enum SourceResponseCodeEnum {
SUCCESS(10200, "请求成功"),
UNPROCESSED_BUSINESS(10201, "立即响应,业务还未处理,类似肯定应答"),
NORMAL_RESPONSE(10202, "正常响应中间状态码"),
MESSAGE_PARSING_ERROR(10520, "报文解析有误"),
CONTROLLED_SOURCE_ERROR(10521, "程控源参数有误"),
TEST_ITEM_PARSING_ERROR(10522, "测试项解析有误"),
SOURCE_CONNECTION_ERROR(10523, "源连接失败"),
SOURCE_CONTROL_ERROR(10524, "获取源控制权失败"),
RESET_ERROR(10525, "重置源失败"),
STOP_ERROR(10526, "停止源失败"),
NOT_INITIALIZED(10527, "源未进行初始化"),
TARGET_SOURCE_ERROR(10528, "目标源有误(该用户已控制其他源,在关闭前无法操作新的源)"),
UNABLE_TO_RESPOND(10529, "源状态有误,无法响应报文(例如源处于输出状态,无法响应初始化报文)");
private Integer code;
private String message;
SourceResponseCodeEnum(Integer code, String message) {
this.code = code;
this.message = message;
}
public static String getMsgByValue(Integer code) {
for (SourceResponseCodeEnum state : SourceResponseCodeEnum.values()) {
if (state.getCode().equals(code)) {
return state.getMessage();
}
}
return null;
}
public static SourceResponseCodeEnum getDictDataEnumByCode(Integer code) {
for (SourceResponseCodeEnum sourceResponseCodeEnum : SourceResponseCodeEnum.values()) {
if (ObjectUtil.equals(code, sourceResponseCodeEnum.getCode())) {
return sourceResponseCodeEnum;
}
}
return null;
}
}

View File

@@ -15,7 +15,7 @@ public class PreDetectionParam {
/**
* 检测计划id
*/
private String plan;
private String planId;
/**
* 用户功能组成唯一标识 zhangsan_test

View File

@@ -1,5 +1,6 @@
package com.njcn.gather.detection.pojo.vo;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
/**
@@ -13,21 +14,25 @@ public class SocketDataMsg {
/**
* 请求id确保接收到响应时知晓是针对的哪次请求的应答
*/
@JSONField(ordinal = 1)
private String requestId;
/**
* 源初始化 INIT_GATHER$01 INIT_GATHER采集初始化01 统计采集、02 暂态采集、03 实时采集
*/
@JSONField(ordinal = 2)
private String operateCode;
/**
* 数据体传输前需要将对象、Array等转为String
*/
@JSONField(ordinal = 4)
private String data;
/**
* code码
*/
private String code;
@JSONField(ordinal = 3)
private Integer code;
}

View File

@@ -2,9 +2,7 @@ package com.njcn.gather.detection.service;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import java.util.List;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
/**
* @author wr
@@ -21,21 +19,6 @@ public interface PreDetectionService {
*/
void sourceCommunicationCheck(PreDetectionParam param);
/**
* 装置通讯校验
*/
void deviceCommunicationCheck();
/**
* 协议校验
*/
void agreementCheck();
/**
* 相序校验
*/
void phaseSequenceCheck();
boolean startTest(PreDetectionParam param);

View File

@@ -3,7 +3,9 @@ package com.njcn.gather.detection.service.impl;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.gather.detection.handler.SocketSourceResponseService;
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.service.PreDetectionService;
@@ -24,8 +26,6 @@ import com.njcn.gather.plan.service.IAdPlanService;
import com.njcn.gather.plan.service.IAdPlanSourceService;
import com.njcn.gather.system.dictionary.pojo.enums.DictDataEnum;
import com.njcn.gather.system.dictionary.service.IDictDataService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -53,25 +53,24 @@ public class PreDetectionServiceImpl implements PreDetectionService {
private final String userId = "aaa";
@Override
public void sourceCommunicationCheck(PreDetectionParam param) {
/*
先组装源通讯协议
查询计划什么模式的(除了对比式,其他都是一个计划对应一个源)
*/
AdPlan plan = iAdPlanService.getById(param.getPlan());
AdPlan plan = iAdPlanService.getById(param.getPlanId());
if (ObjectUtil.isNotNull(plan)) {
String code = dictDataService.getDictDataById(plan.getPattern()).getCode();
DictDataEnum dictDataEnumByCode = DictDataEnum.getDictDataEnumByCode(code);
switch (dictDataEnumByCode) {
case DIGITAL:
case SIMULATE:
sendYtxSocket(plan.getId());
sendYtxSocket(plan.getId(),param.getUserPageId());
break;
case CONTRAST:
//todo 对比式可以是多个源
sendYtxSocket(plan.getId(),param.getUserPageId());
break;
default:
//todo 没有找到对应的模式
@@ -84,27 +83,23 @@ public class PreDetectionServiceImpl implements PreDetectionService {
}
/**
* 源参数下发
* @param scriptId
*/
private void sendSourceIssue(String scriptId){
}
private void sendYtxSocket(String planId){
AdPlanSource planSource = adPlanSourceService.getById(planId);
private void sendYtxSocket(String planId,String userPageId){
AdPlanSource planSource = adPlanSourceService.getOne(new LambdaQueryWrapper<AdPlanSource>()
.eq(AdPlanSource::getPlanId,planId)
);
if(ObjectUtil.isNotNull(planSource)){
SourceInitialize sourceParam = pqSourceService.getSourceInitializeParam(planSource.getSourceId());
if(ObjectUtil.isNotNull(sourceParam)){
//开始组装socket报文请求头
SocketMsg msg=new SocketMsg();
msg.setRequestId("yjc_ytxjy");
msg.setOperateCode("");
msg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
msg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue());
msg.setData(JSON.toJSONString(sourceParam));
NettyClient.socketClient(ip, port, JSON.toJSONString(msg), new NettySourceClientHandler(ip, sourceResponseService));
String s = userPageId + "_Source";
NettyClient.socketClient(ip, port, s, new NettySourceClientHandler(s, sourceResponseService));
SocketManager.sendMsg(s,JSON.toJSONString(msg));
PqScriptIssueParam param=new PqScriptIssueParam();
param.setScriptId(planSource.getSourceId());
param.setPlanId(planId);
@@ -122,63 +117,51 @@ public class PreDetectionServiceImpl implements PreDetectionService {
}
}
@Override
public void deviceCommunicationCheck() {
}
@Override
public void agreementCheck() {
}
@Override
public void phaseSequenceCheck() {
}
@Override
public boolean startTest(PreDetectionParam param) {
List<PreDetection> pqDevList = iPqDevService.getDevInfo(Arrays.asList("578c142b7e4e4978a35bd6225aa62a23", "393504f55f1f79bce255bfc195cfdb56"));
System.out.println(pqDevList);
//校验
//组装请求数据
SocketMsg socketMsg = new SocketMsg();
Map<String, List<PreDetection>> map = new HashMap();
map.put("deviceList", pqDevList);
String jsonString = JSON.toJSONString(map);
socketMsg.setRequestId("adawdawd");
socketMsg.setOperateCode("INIT_GATHER$03");
socketMsg.setData(jsonString);
String json = JSON.toJSONString(socketMsg);
String tem = "{\"data\":\"{\\\"deviceList\\\":[{\\\"devIP\\\":\\\"192.168.1.186\\\",\\\"port\\\":102,\\\"devType\\\":\\\"PQS882B\\\",\\\"icdType\\\":\\\"PQS882_VX_ZJ_1(V102)\\\",\\\"devCode\\\":\\\"Pqs\\u0026cn870299\\\",\\\"devKey\\\":\\\"!qaz@wsx3edc4rfv\\\",\\\"monitorList\\\":[{\\\"lineId\\\":\\\"1_192.168.1.186_102_1\\\",\\\"line\\\":1}]}]}\",\"operateCode\":\"INIT_GATHER$03\",\"requestId\":\"dansldquiwdlandalksn\"}";
Runnable runnable = new Runnable() {
@Override
public void run() {
Channel channel = null;
if(SocketManager.getChannelByUserId(param.getUserPageId()) == null || !SocketManager.getChannelByUserId(param.getUserPageId()).isActive()){
channel = NettyClient.socketClient(ip, port,param.getUserPageId(),new NettySourceClientHandler(param.getUserPageId(), sourceResponseService));
// List<PreDetection> pqDevList = iPqDevService.getDevInfo(Arrays.asList("578c142b7e4e4978a35bd6225aa62a23", "393504f55f1f79bce255bfc195cfdb56"));
// System.out.println(pqDevList);
// //校验
//
// //组装请求数据
// SocketMsg socketMsg = new SocketMsg();
// Map<String, List<PreDetection>> map = new HashMap();
// map.put("deviceList", pqDevList);
// String jsonString = JSON.toJSONString(map);
// socketMsg.setRequestId("adawdawd");
// socketMsg.setOperateCode("INIT_GATHER$03");
// socketMsg.setData(jsonString);
// String json = JSON.toJSONString(socketMsg);
//
// NettyClient.socketClient(ip, port, "{\"data\":\"{\\\"deviceList\\\":[{\\\"devIP\\\":\\\"192.168.1.186\\\",\\\"port\\\":102,\\\"devType\\\":\\\"PQS882B\\\",\\\"icdType\\\":\\\"PQS882_VX_ZJ_1(V102)\\\",\\\"devCode\\\":\\\"Pqs\\u0026cn870299\\\",\\\"devKey\\\":\\\"!qaz@wsx3edc4rfv\\\",\\\"monitorList\\\":[{\\\"lineId\\\":\\\"1_192.168.1.186_102_1\\\",\\\"line\\\":1}]}]}\",\"operateCode\":\"INIT_GATHER$03\",\"requestId\":\"dansldquiwdlandalksn\"}", new NettySourceClientHandler(param.getUserPageId(), sourceResponseService));
Runnable runnable = new Runnable() {
@Override
public void run() {
// NettyClient.socketClient(ip, port, "源客户端初始化发送", new NettySourceClientHandler(ip + "_" + port, sourceResponseService));
}
if(Objects.nonNull(channel)){
try {
channel.writeAndFlush(tem).sync();
} catch (InterruptedException e) {
System.out.println("发送异常=====");
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
};
runnable.run();
};
runnable.run();
System.out.println("111111111111111111111+++++++++++++++");
// Runnable runnable2 = new Runnable() {
// @Override
// public void run() {
// NettyClient.socketClient(ip, 61001, "装置客户端初始化发送", new NettySourceClientHandler(ip + "_" + 61001, sourceResponseService2));
// }
// };
// runnable2.run();
return true;
String tem = "{\"data\":\"{\\\"deviceList\\\":[{\\\"devIP\\\":\\\"192.168.1.186\\\",\\\"port\\\":102,\\\"devType\\\":\\\"PQS882B\\\",\\\"icdType\\\":\\\"PQS882_VX_ZJ_1(V102)\\\",\\\"devCode\\\":\\\"Pqs\\u0026cn870299\\\",\\\"devKey\\\":\\\"!qaz@wsx3edc4rfv\\\",\\\"monitorList\\\":[{\\\"lineId\\\":\\\"1_192.168.1.186_102_1\\\",\\\"line\\\":1}]}]}\",\"operateCode\":\"INIT_GATHER$03\",\"requestId\":\"dansldquiwdlandalksn\"}";
return false;
}
// public static void main(String[] args) {
// NettyClient.socketClient("192.168.1.121", 61000, "源客户端初始化发送", new NettySourceClientHandler( "192.168.1.121_61000"));
// }
}

View File

@@ -12,7 +12,7 @@ import com.njcn.gather.detection.pojo.vo.SocketDataMsg;
public class MsgUtil {
public SocketDataMsg socketDataMsg(String textMsg){
public static SocketDataMsg socketDataMsg(String textMsg){
return JSON.parseObject(textMsg,SocketDataMsg.class);
}
}

View File

@@ -12,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @Date: 2024/12/11 13:04
*/
public class SocketManager {
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
public static void addUser(String userId, Channel channel) {
@@ -28,8 +29,7 @@ public class SocketManager {
public static void sendMsg(String userId,String msg) {
Channel channel = userSessions.get(userId);
TextWebSocketFrame wd1 = new TextWebSocketFrame(msg);
channel.writeAndFlush(wd1);
channel.writeAndFlush(msg);
}
}

View File

@@ -9,7 +9,6 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.TimeoutException;
@@ -18,7 +17,6 @@ import io.netty.util.CharsetUtil;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ProtocolException;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
/**
@@ -29,11 +27,9 @@ import java.util.concurrent.TimeUnit;
public class NettyClient {
public static Channel socketClient(String ip, Integer port,String userId, ChannelHandler handler) {
public static void socketClient(String ip, Integer port,String userId, ChannelHandler handler) {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
Channel channel = null;
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
@@ -81,14 +77,10 @@ public class NettyClient {
}
});
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
channel = channelFuture.channel();
SocketManager.addUser(userId,channel);
return channel;
SocketManager.addUser(userId,channelFuture.channel());
} catch (Exception e) {
System.out.println("进入异常............");
e.printStackTrace();
return null;
}finally {
System.out.println("进入clientSocket最后步骤---------------------");
}

View File

@@ -2,13 +2,10 @@ package com.njcn.gather.detection.util.socket.cilent;
import com.njcn.gather.detection.handler.SocketSourceResponseService;
import com.njcn.gather.detection.util.socket.SocketManager;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import java.util.Objects;
/**
@@ -24,13 +21,6 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
private final SocketSourceResponseService sourceResponseService;
@Value("${socket.device.ip}")
private String devIp;
@Value("${socket.device.port}")
private Integer devPort;
/**
* 当通道进行连接时推送消息
* @param ctx
@@ -41,7 +31,7 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
}
/**
* 处理服务端消息消息信息
* 处理服务端消息信息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException {

View File

@@ -1,11 +1,6 @@
package com.njcn.gather.detection.util.socket.web;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.util.socket.WebServiceManager;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

View File

@@ -1,8 +1,10 @@
package com.njcn.gather.plan.pojo.po;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@@ -13,16 +15,19 @@ import java.io.Serializable;
@Data
@TableName("ad_plan_source")
@AllArgsConstructor
@NoArgsConstructor
public class AdPlanSource implements Serializable {
private static final long serialVersionUID = -76292730578149530L;
/**
* 检测计划表Id
*/
@TableField("Plan_Id")
private String planId;
/**
* 检测源表Id
*/
@TableField("Source_Id")
private String sourceId;
}