From b418855d02b44d8010e050a813142f3526aef642 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Wed, 29 Apr 2026 13:57:22 +0800 Subject: [PATCH] =?UTF-8?q?refactor(realtime):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=95=B0=E6=8D=AE=E5=88=86=E6=9E=90=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E8=AE=BE=E5=A4=87=E6=8E=A5=E5=85=A5=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除CldDevRunFlagConsumer中的注释代码,启用设备状态翻转功能 - 添加channelData2方法支持第二种设备类型的数据处理 - 在统计服务中新增通信记录功能,记录设备上线状态 --- .../njcn/rt/service/impl/RtServiceImpl.java | 95 ++++++++++++++++++- .../stat/service/impl/StatServiceImpl.java | 21 ++-- .../consumer/CldDevRunFlagConsumer.java | 2 +- 3 files changed, 103 insertions(+), 15 deletions(-) diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java index adf38d1..d3c4fc7 100644 --- a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java @@ -11,8 +11,10 @@ import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.DataArrayFeignClient; import com.njcn.csdevice.api.DataSetFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.pojo.po.CsDataArray; import com.njcn.csdevice.pojo.po.CsDataSet; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.redis.utils.RedisUtil; @@ -50,6 +52,7 @@ public class RtServiceImpl implements IRtService { private final ChannelObjectUtil channelObjectUtil; private final MqttPublisher publisher; private final RedisSetUtil redisSetUtil; + private final EquipmentFeignClient equipmentFeignClient; @Override public void analysis(AppAutoDataMessage appAutoDataMessage) { @@ -73,12 +76,14 @@ public class RtServiceImpl implements IRtService { } //根据dataArray解析数据 AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0); + //获取设备类型 + CsEquipmentDeliveryPO po1 =equipmentFeignClient.getDevByLineId(lineId).getData(); //fixme 这边先根据数据集的名称来返回对应实体,这边感觉不太合适,后期有好方案再调整 //基础数据 if (dataSet.getName().contains("Ds$Pqd$Rt$Basic$")) { //用户Id String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString(); - BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType()); + BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod()); baseRealDataSet.setUserId(userId); baseRealDataSet.setLineId(lineId); baseRealDataSet.setPt(po.getPtRatio().floatValue()); @@ -93,7 +98,7 @@ public class RtServiceImpl implements IRtService { if (ObjectUtil.isNotNull(redisObject)) { Set userSet = redisSetUtil.convertToSet(redisObject); userSet.forEach(userId->{ - BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType()); + BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod()); baseRealDataSet.setUserId(userId); baseRealDataSet.setLineId(lineId); baseRealDataSet.setPt(po.getPtRatio().floatValue()); @@ -101,7 +106,7 @@ public class RtServiceImpl implements IRtService { baseRealDataSet.setDataLevel(dataSet.getDataLevel()); long timestamp = item.getDataTimeSec(); baseRealDataSet.setDataTime(getTime(timestamp)); - publisher.send("/Web/RealData/" + userId, new Gson().toJson(baseRealDataSet), 1, false); + publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false); }); } } @@ -200,9 +205,13 @@ public class RtServiceImpl implements IRtService { return dataMap; } - public BaseRealDataSet assembleData(List dataArrayList,AppAutoDataMessage.DataArray dataArray,Integer conType) { + public BaseRealDataSet assembleData(List dataArrayList,AppAutoDataMessage.DataArray dataArray,Integer conType,String accessMethod) { Map dataMap = getData(dataArrayList,dataArray); - return channelData(dataMap,conType); + if (Objects.equals("CLD",accessMethod)) { + return channelData(dataMap,conType); + } else { + return channelData2(dataMap); + } } public BaseRealDataSet channelData(Map map,Integer conType) { @@ -311,6 +320,82 @@ public class RtServiceImpl implements IRtService { return baseRealDataSet; } + public BaseRealDataSet channelData2(Map map) { + BaseRealDataSet baseRealDataSet = new BaseRealDataSet(); + //频率 + baseRealDataSet.setFreq(map.get("Pq_FreqM")); + //频率偏差 + baseRealDataSet.setFreqDev(map.get("Pq_FreqDevM")); + //判断监测点的接线方式,不同接线方式电压有效值取值不同 + //星型-相电压 角形、V型-线电压 + //电压有效值 + baseRealDataSet.setVRmsA(map.get("Pq_RmsUA")); + baseRealDataSet.setVRmsB(map.get("Pq_RmsUB")); + baseRealDataSet.setVRmsC(map.get("Pq_RmsUC")); + //基波电压幅值 + baseRealDataSet.setV1A(map.get("Pq_RmsFundUA")); + baseRealDataSet.setV1B(map.get("Pq_RmsFundUB")); + baseRealDataSet.setV1C(map.get("Pq_RmsFundUC")); + //电流有效值 + baseRealDataSet.setIRmsA(map.get("Pq_RmsIA")); + baseRealDataSet.setIRmsB(map.get("Pq_RmsIB")); + baseRealDataSet.setIRmsC(map.get("Pq_RmsIC")); + //基波电流幅值 + baseRealDataSet.setI1A(map.get("Pq_RmsFundIA")); + baseRealDataSet.setI1B(map.get("Pq_RmsFundIB")); + baseRealDataSet.setI1C(map.get("Pq_RmsFundIC")); + //电压偏差 + baseRealDataSet.setVDevA(map.get("Pq_UDevA")); + baseRealDataSet.setVDevB(map.get("Pq_UDevB")); + baseRealDataSet.setVDevC(map.get("Pq_UDevC")); + //基波电压相位 + baseRealDataSet.setV1AngA(map.get("Pq_FundUAngA")); + baseRealDataSet.setV1AngB(map.get("Pq_FundUAngB")); + baseRealDataSet.setV1AngC(map.get("Pq_FundUAngC")); + //基波电流相位 + baseRealDataSet.setI1AngA(map.get("Pq_FundIAngA")); + baseRealDataSet.setI1AngB(map.get("Pq_FundIAngB")); + baseRealDataSet.setI1AngC(map.get("Pq_FundIAngC")); + //电压总谐波畸变率 + baseRealDataSet.setVThdA(map.get("Pq_ThdUA")); + baseRealDataSet.setVThdB(map.get("Pq_ThdUB")); + baseRealDataSet.setVThdC(map.get("Pq_ThdUC")); + //电流总谐波畸变率 + baseRealDataSet.setIThdA(map.get("Pq_ThdIA")); + baseRealDataSet.setIThdB(map.get("Pq_ThdIB")); + baseRealDataSet.setIThdC(map.get("Pq_ThdIC")); + //电压不平衡度 + baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUM")); + //电流不平衡度 + baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIM")); + //有功功率 + baseRealDataSet.setPA(map.get("Pq_PA")); + baseRealDataSet.setPB(map.get("Pq_PB")); + baseRealDataSet.setPC(map.get("Pq_PC")); + baseRealDataSet.setPTot(map.get("Pq_TotPM")); + //无功功率 + baseRealDataSet.setQA(map.get("Pq_QA")); + baseRealDataSet.setQB(map.get("Pq_QB")); + baseRealDataSet.setQC(map.get("Pq_QC")); + baseRealDataSet.setQTot(map.get("Pq_TotQM")); + //视在功率 + baseRealDataSet.setSA(map.get("Pq_SA")); + baseRealDataSet.setSB(map.get("Pq_SB")); + baseRealDataSet.setSC(map.get("Pq_SC")); + baseRealDataSet.setSTot(map.get("Pq_TotSM")); + //视在功率因数 + baseRealDataSet.setPfA(map.get("Pq_PFA")); + baseRealDataSet.setPfB(map.get("Pq_PFB")); + baseRealDataSet.setPfC(map.get("Pq_PFC")); + baseRealDataSet.setPfTot(map.get("Pq_TotPFM")); + //位移功率因数 + baseRealDataSet.setDpfA(map.get("Pq_DFA")); + baseRealDataSet.setDpfB(map.get("Pq_DFB")); + baseRealDataSet.setDpfC(map.get("Pq_DFC")); + baseRealDataSet.setDpfTot(map.get("Pq_TotDFM")); + return baseRealDataSet; + } + public HarmRealDataSet harmData(List dataArrayList, AppAutoDataMessage.DataArray dataArray, String dataLevel, Double ct) { HarmRealDataSet harmRealDataSet = new HarmRealDataSet(); List harmDataList = new ArrayList<>(); diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index d0c8e6e..143cb1a 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -1,6 +1,7 @@ package com.njcn.stat.service.impl; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DatePattern; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.njcn.access.api.CsDeviceFeignClient; import com.njcn.access.api.CsLineLatestDataFeignClient; @@ -9,13 +10,13 @@ import com.njcn.access.pojo.po.CsLineLatestData; import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; -import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.CsCommunicateFeignClient; import com.njcn.csdevice.api.DataArrayFeignClient; import com.njcn.csdevice.api.DeviceMessageFeignClient; +import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; import com.njcn.csdevice.pojo.param.DataArrayParam; import com.njcn.csdevice.pojo.po.CsDataArray; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; -import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.influx.pojo.constant.InfluxDBTableConstant; import com.njcn.influx.utils.InfluxDbUtils; import com.njcn.mq.message.AppAutoDataMessage; @@ -23,9 +24,7 @@ import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.stat.enums.StatResponseEnum; import com.njcn.stat.service.IStatService; -import com.njcn.system.api.DicDataFeignClient; import com.njcn.system.enums.DicDataEnum; -import com.njcn.system.pojo.po.DictData; import com.njcn.system.pojo.po.SysDicTreePO; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -38,6 +37,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.TimeUnit; @@ -60,11 +60,7 @@ public class StatServiceImpl implements IStatService { private final CsLineLatestDataFeignClient csLineLatestDataFeignClient; private final CsDeviceFeignClient csDeviceFeignClient; private final DeviceMessageFeignClient deviceMessageFeignClient; - private static final Map PHASE_MAPPING = new HashMap() {{ - put("AB", "A"); - put("BC", "B"); - put("CA", "C"); - }}; + private final CsCommunicateFeignClient csCommunicateFeignClient; @Override @Transactional(rollbackFor = Exception.class) @@ -166,6 +162,13 @@ public class StatServiceImpl implements IStatService { //判断设备运行状态 if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) { csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode()); + //记录设备上线 + PqsCommunicateDto dto = new PqsCommunicateDto(); + dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + dto.setDevId(appAutoDataMessage.getId()); + dto.setType(1); + dto.setDescription("通讯正常"); + csCommunicateFeignClient.insertion(dto); } } System.gc(); diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java index 03426ad..b882757 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java @@ -49,7 +49,7 @@ public class CldDevRunFlagConsumer extends EnhanceConsumerMessageHandler