This commit is contained in:
wr
2024-12-18 20:46:41 +08:00
parent b8a7f98cec
commit 1e7647dadc
7 changed files with 60 additions and 239 deletions

View File

@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
public class SocketDevResponseService {
private final String handlerStr = "_dev";
private final String handlerStr = "_Dev";
private final WebSocketHandler webSocketHandler;
private final IPqDevService iPqDevService;
private final IPqScriptDtlsService scriptDtlsService;
@@ -115,6 +115,7 @@ public class SocketDevResponseService {
devXieyi(socketDataMsg, param, msg);
break;
case YJC_XUJY:
devXu(param, socketDataMsg);
break;
}
@@ -224,8 +225,6 @@ public class SocketDevResponseService {
}
break;
case UNPROCESSED_BUSINESS:
break;
@@ -263,23 +262,27 @@ public class SocketDevResponseService {
DevData devData = JSON.parseObject(data, DevData.class);
SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
if (ObjectUtil.isNotNull(dictDataEnumByCode)) {
devInfo.add(devData);
SocketMsg socketMsg = new SocketMsg();
switch (dictDataEnumByCode) {
case SUCCESS:
devInfo.add(devData);
success.add(devData.getId());
if (success.size() == moniterIdList.size()) {
PqScriptIssueParam sourceParam = new PqScriptIssueParam();
sourceParam.setPlanId(param.getPlanId());
sourceParam.setDevIds(param.getDevIds());
sourceParam.setIsPhaseSequence(true);
List<SourceIssue> sourceIssues = scriptDtlsService.listSourceIssue(sourceParam);
if (CollUtil.isNotEmpty(sourceIssues)) {
List<SourceCompareDev> info = new ArrayList<>();
if (CollUtil.isNotEmpty(sourceIssues)) {
for (DevData dev : devInfo) {
info.addAll(devIsSource(dev, sourceIssues.get(0)));
}
}
xuClear();
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setData(JSON.toJSONString(sourceIssues));
socketMsg.setData(JSON.toJSONString(info));
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
}
break;
@@ -287,6 +290,7 @@ public class SocketDevResponseService {
break;
case NORMAL_RESPONSE:
devInfo.add(devData);
break;
case RE_OPERATE:
break;
@@ -301,213 +305,6 @@ public class SocketDevResponseService {
}
public static void main(String[] args) {
// String a = "{\"requestId\":\"dansldquiwdlandalksn\",\"operateCode\":\"DATA_REQUEST$01\",\"data\":\"{\\\"Time\\\":\\\"2024-12-18T10:26:00\\\",\\\"ID\\\":\\\"192.168.1.186_1\\\",\\\"result\\\":false,\\\"SqlData\\\":[{\\\"type\\\":\\\"平均值\\\",\\\"desc\\\":\\\"电压有效值\\\",\\\"list\\\":{\\\"A\\\":\\\"5.863635\\\",\\\"B\\\":\\\"5.865018\\\",\\\"C\\\":\\\"5.867418\\\",\\\"T\\\":null}},{\\\"type\\\":\\\"平均值\\\",\\\"desc\\\":\\\"电流有效值\\\",\\\"list\\\":{\\\"A\\\":\\\"0.000795\\\",\\\"B\\\":\\\"0.002215\\\",\\\"C\\\":\\\"0.003610\\\",\\\"T\\\":null}}],\\\"SqlDataHarm\\\":[]}\",\"code\":10202}";
// SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(a);
// String data = socketDataMsg.getData();
// DevData devData = JSON.parseObject(data, DevData.class);
String b = "{\"requestId\":\"dansldquiwdlandalksn\",\"operateCode\":\"DATA_REQUEST$01\",\"data\":\"{\\\"Time\\\":\\\"2024-12-18T10:27:00\\\",\\\"ID\\\":\\\"192.168.1.186_1\\\",\\\"result\\\":false,\\\"SqlData\\\":[{\\\"type\\\":\\\"平均值\\\",\\\"desc\\\":\\\"电压有效值\\\",\\\"list\\\":{\\\"A\\\":\\\"70.0\\\",\\\"B\\\":\\\"50.0\\\",\\\"C\\\":\\\"40.0\\\",\\\"T\\\":null}},{\\\"type\\\":\\\"平均值\\\",\\\"desc\\\":\\\"电流有效值\\\",\\\"list\\\":{\\\"A\\\":\\\"0.000782\\\",\\\"B\\\":\\\"0.002222\\\",\\\"C\\\":\\\"0.003602\\\",\\\"T\\\":null}}],\\\"SqlDataHarm\\\":[]}\",\"code\":10200}";
SocketDataMsg socketDataMsgb = MsgUtil.socketDataMsg(b);
String datab = socketDataMsgb.getData();
DevData devDatab = JSON.parseObject(datab, DevData.class);
List<SocketDataMsg> list = new LinkedList<>();
// list.add(socketDataMsg);
list.add(socketDataMsgb);
String is = "\n" +
" {\n" +
" \"sourceId\": \"1111\",\n" +
" \"type\": \"Freq\",\n" +
" \"subType\": \"NULL\",\n" +
" \"fUn\": 57.74,\n" +
" \"fIn\": 5.0,\n" +
" \"fFreq\": 50.0,\n" +
" \"channelList\": [\n" +
" {\n" +
" \"channelFlag\": true,\n" +
" \"harmFlag\": false,\n" +
" \"inHarmFlag\": false,\n" +
" \"dipFlag\": false,\n" +
" \"flickerFlag\": false,\n" +
" \"channelType\": \"Ua\",\n" +
" \"fAmp\": 60.0,\n" +
" \"fPhase\": 0.0,\n" +
" \"harmList\": [],\n" +
" \"inharmList\": [],\n" +
" \"dipData\": {\n" +
" \"fTransValue\": 0.0,\n" +
" \"fPreTime\": 2.0,\n" +
" \"fRampIn\": 0.001,\n" +
" \"fRetainTime\": 0.0,\n" +
" \"fRampOut\": 0.001,\n" +
" \"fAfterTime\": 3.0\n" +
" },\n" +
" \"flickerData\": {\n" +
" \"waveFluType\": \"SQU\",\n" +
" \"waveType\": \"CPM\",\n" +
" \"fDutyCycle\": 50.0,\n" +
" \"fChagFre\": 0.0,\n" +
" \"fChagValue\": 0.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"channelFlag\": true,\n" +
" \"harmFlag\": false,\n" +
" \"inHarmFlag\": false,\n" +
" \"dipFlag\": false,\n" +
" \"flickerFlag\": false,\n" +
" \"channelType\": \"Ub\",\n" +
" \"fAmp\": 50.0,\n" +
" \"fPhase\": -120.0,\n" +
" \"harmList\": [],\n" +
" \"inharmList\": [],\n" +
" \"dipData\": {\n" +
" \"fTransValue\": 0.0,\n" +
" \"fPreTime\": 2.0,\n" +
" \"fRampIn\": 0.001,\n" +
" \"fRetainTime\": 0.0,\n" +
" \"fRampOut\": 0.001,\n" +
" \"fAfterTime\": 3.0\n" +
" },\n" +
" \"flickerData\": {\n" +
" \"waveFluType\": \"SQU\",\n" +
" \"waveType\": \"CPM\",\n" +
" \"fDutyCycle\": 50.0,\n" +
" \"fChagFre\": 0.0,\n" +
" \"fChagValue\": 0.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"channelFlag\": true,\n" +
" \"harmFlag\": false,\n" +
" \"inHarmFlag\": false,\n" +
" \"dipFlag\": false,\n" +
" \"flickerFlag\": false,\n" +
" \"channelType\": \"Uc\",\n" +
" \"fAmp\": 40.0,\n" +
" \"fPhase\": 120.0,\n" +
" \"harmList\": [],\n" +
" \"inharmList\": [],\n" +
" \"dipData\": {\n" +
" \"fTransValue\": 0.0,\n" +
" \"fPreTime\": 2.0,\n" +
" \"fRampIn\": 0.001,\n" +
" \"fRetainTime\": 0.0,\n" +
" \"fRampOut\": 0.001,\n" +
" \"fAfterTime\": 3.0\n" +
" },\n" +
" \"flickerData\": {\n" +
" \"waveFluType\": \"SQU\",\n" +
" \"waveType\": \"CPM\",\n" +
" \"fDutyCycle\": 50.0,\n" +
" \"fChagFre\": 0.0,\n" +
" \"fChagValue\": 0.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"channelFlag\": true,\n" +
" \"harmFlag\": false,\n" +
" \"inHarmFlag\": false,\n" +
" \"dipFlag\": false,\n" +
" \"flickerFlag\": false,\n" +
" \"channelType\": \"Ia\",\n" +
" \"fAmp\": 1.0,\n" +
" \"fPhase\": 0.0,\n" +
" \"harmList\": [],\n" +
" \"inharmList\": [],\n" +
" \"dipData\": {\n" +
" \"fTransValue\": 0.0,\n" +
" \"fPreTime\": 2.0,\n" +
" \"fRampIn\": 0.001,\n" +
" \"fRetainTime\": 0.0,\n" +
" \"fRampOut\": 0.001,\n" +
" \"fAfterTime\": 3.0\n" +
" },\n" +
" \"flickerData\": {\n" +
" \"waveFluType\": \"SQU\",\n" +
" \"waveType\": \"CPM\",\n" +
" \"fDutyCycle\": 50.0,\n" +
" \"fChagFre\": 0.0,\n" +
" \"fChagValue\": 0.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"channelFlag\": true,\n" +
" \"harmFlag\": false,\n" +
" \"inHarmFlag\": false,\n" +
" \"dipFlag\": false,\n" +
" \"flickerFlag\": false,\n" +
" \"channelType\": \"Ib\",\n" +
" \"fAmp\": 2.0,\n" +
" \"fPhase\": -120.0,\n" +
" \"harmList\": [],\n" +
" \"inharmList\": [],\n" +
" \"dipData\": {\n" +
" \"fTransValue\": 0.0,\n" +
" \"fPreTime\": 2.0,\n" +
" \"fRampIn\": 0.001,\n" +
" \"fRetainTime\": 0.0,\n" +
" \"fRampOut\": 0.001,\n" +
" \"fAfterTime\": 3.0\n" +
" },\n" +
" \"flickerData\": {\n" +
" \"waveFluType\": \"SQU\",\n" +
" \"waveType\": \"CPM\",\n" +
" \"fDutyCycle\": 50.0,\n" +
" \"fChagFre\": 0.0,\n" +
" \"fChagValue\": 0.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"channelFlag\": true,\n" +
" \"harmFlag\": false,\n" +
" \"inHarmFlag\": false,\n" +
" \"dipFlag\": false,\n" +
" \"flickerFlag\": false,\n" +
" \"channelType\": \"Ic\",\n" +
" \"fAmp\": 3.0,\n" +
" \"fPhase\": 120.0,\n" +
" \"harmList\": [],\n" +
" \"inharmList\": [],\n" +
" \"dipData\": {\n" +
" \"fTransValue\": 0.0,\n" +
" \"fPreTime\": 2.0,\n" +
" \"fRampIn\": 0.001,\n" +
" \"fRetainTime\": 0.0,\n" +
" \"fRampOut\": 0.001,\n" +
" \"fAfterTime\": 3.0\n" +
" },\n" +
" \"flickerData\": {\n" +
" \"waveFluType\": \"SQU\",\n" +
" \"waveType\": \"CPM\",\n" +
" \"fDutyCycle\": 50.0,\n" +
" \"fChagFre\": 0.0,\n" +
" \"fChagValue\": 0.0\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n";
SourceIssue issue = JSON.parseObject(is, SourceIssue.class);
/**
* 开始收到消息将消息进行存在
* 先将成功消息存储在一个set里
* 然后在判断所有的装置是否都是存在的
*/
// SocketDevResponseService service = new SocketDevResponseService(null, null);
//
// for (SocketDataMsg dataMsg : list) {
// service.test(null, dataMsg);
// }
// System.out.println();
}
private List<SourceCompareDev> devIsSource(DevData dev, SourceIssue issue) {
List<SourceCompareDev> info = new ArrayList<>();
@@ -590,6 +387,11 @@ public class SocketDevResponseService {
return compareDev;
}
public void xuClear() {
this.moniterIdList.clear();
this.devInfo.clear();
this.success.clear();
}
/**
* 初始化集合
@@ -611,6 +413,4 @@ public class SocketDevResponseService {
}
}

View File

@@ -71,7 +71,7 @@ public class SocketSourceResponseService {
case SUCCESS:
//todo 前端推送收到的消息暂未处理好
// webSocketHandler.sendMsgToUser(param.getUserPageId(), msg);
String s = param.getUserPageId() + "_dev";
String s = param.getUserPageId() + "_Dev";
//开始设备通讯检测
Channel channel = SocketManager.getChannelByUserId(s);
if(channel==null || !channel.isActive()){
@@ -114,19 +114,19 @@ public class SocketSourceResponseService {
socketMsg.setRequestId(socketDataMsg.getRequestId());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setData(dictDataEnumByCode.getMessage());
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
// webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
String s = param.getUserPageId() + "_Dev";
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_DATA_REQUEST_02.getValue());
List<PreDetection> pqDevList = iPqDevService.getDevInfo(param.getDevIds());
List<String> moniterIdList = pqDevList.stream().flatMap(x -> x.getMonitorList().stream())
.map(PreDetection.MonitorListDTO::getLineId)
.collect(Collectors.toList());
DevPhaseSequenceParam phaseSequenceParam=new DevPhaseSequenceParam();
phaseSequenceParam.setMoniterIdList(moniterIdList);
phaseSequenceParam.setDataType(Arrays.asList("平均值/电压有效值","平均值/电流有效值"));
phaseSequenceParam.setDataType(Arrays.asList("实时数据/电压有效值","实时数据/电流有效值"));
phaseSequenceParam.setReadCount(1);
phaseSequenceParam.setIgnoreCount(1);
socketMsg.setData(JSON.toJSONString(phaseSequenceParam));
@@ -138,7 +138,7 @@ public class SocketSourceResponseService {
socketMsg.setRequestId(socketDataMsg.getRequestId());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setData(dictDataEnumByCode.getMessage());
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
// webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
break;
}
}

View File

@@ -22,10 +22,12 @@ public enum SourceOperateCodeEnum {
* 终端 INIT_GATHER$01 INIT_GATHER采集初始化01 统计采集、02 暂态采集、03 实时采集
*/
DEV_INIT_GATHER_01("INIT_GATHER$01", "统计采集"),
DEV_INIT_GATHER_02("INIT_GATHER$02", "暂态采集"),
DEV_INIT_GATHER_03("INIT_GATHER$03", "实时采集"),
DEV_INIT_GATHER_02("INIT_GATHER$02", "实时采集"),
DEV_INIT_GATHER_03("INIT_GATHER$03", "暂态采集"),
DEV_DATA_REQUEST_01("DATA_REQUEST$01", "统计采集申请"),
DEV_DATA_REQUEST_02("DATA_REQUEST$02", "实时采集申请"),
DEV_DATA_REQUEST_03("DATA_REQUEST$03", "暂态采集申请"),
YJC_YTXJY("yjc_ytxjy", "预检测_源通讯检测"),

View File

@@ -1,5 +1,6 @@
package com.njcn.gather.detection.pojo.po;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.Map;
@@ -16,35 +17,42 @@ public class SourceCompareDev {
/**
* 装置名称
*/
@JSONField(ordinal = 1)
private String devName;
/**
* 装置ip
*/
@JSONField(ordinal = 2)
private String ip;
/**
* 装置通道
*/
@JSONField(ordinal = 3)
private String lineNum;
/**
* 描述
*/
@JSONField(ordinal = 3)
private String desc;
/**
* 是否合格
*/
@JSONField(ordinal = 4)
private Boolean isQualified;
/**
* 源数据
*/
@JSONField(ordinal = 5)
private Map<String, Float> sourceData;
/**
* 装置数据
*/
@JSONField(ordinal = 6)
private Map<String, Float> DevData;

View File

@@ -9,7 +9,7 @@ import lombok.Data;
* @date 2024/12/11 15:57
*/
@Data
public class SocketMsg {
public class SocketMsg<T> {
/**
* 请求id确保接收到响应时知晓是针对的哪次请求的应答
@@ -27,5 +27,5 @@ public class SocketMsg {
* 数据体传输前需要将对象、Array等转为String
*/
@JSONField(ordinal = 3)
private String data;
private T data;
}

View File

@@ -39,7 +39,6 @@ import java.util.concurrent.TimeUnit;
@Getter
public class NettyClient {
public static void socketClient(String ip, Integer port,String userPageId,SimpleChannelInboundHandler<String> handler) {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
@@ -49,6 +48,16 @@ public class NettyClient {
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
if(port==10086){
ch.pipeline()
//空闲状态的handler
// 添加LineBasedFrameDecoder来按行分割数据
// .addLast(new LineBasedFrameDecoder(10240))
// .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(handler);
}else{
ch.pipeline()
//空闲状态的handler
// 添加LineBasedFrameDecoder来按行分割数据
@@ -56,8 +65,9 @@ public class NettyClient {
// .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(handler)
;
.addLast(handler);
}
}
});
bootstrap.connect(ip, port).sync().addListener((ChannelFutureListener) ch -> {

View File

@@ -81,13 +81,14 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
SocketManager.addUser(param.getUserPageId()+"_dev",ctx.channel());
SocketManager.addUser(param.getUserPageId()+"_Dev",ctx.channel());
System.out.println("有通道接入" + ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
socketResponseService.xuClear();
// 处理异常,例如记录日志、关闭连接等
cause.printStackTrace();
// 根据异常类型进行不同的处理