refactor(realtime): 优化实时数据分析服务设备接入处理逻辑

- 移除CldDevRunFlagConsumer中的注释代码,启用设备状态翻转功能
- 添加channelData2方法支持第二种设备类型的数据处理
- 在统计服务中新增通信记录功能,记录设备上线状态
This commit is contained in:
xy
2026-04-29 13:57:22 +08:00
parent 9774172c0b
commit b418855d02
3 changed files with 103 additions and 15 deletions

View File

@@ -11,8 +11,10 @@ import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DataSetFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDataSet;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.redis.utils.RedisUtil;
@@ -50,6 +52,7 @@ public class RtServiceImpl implements IRtService {
private final ChannelObjectUtil channelObjectUtil;
private final MqttPublisher publisher;
private final RedisSetUtil redisSetUtil;
private final EquipmentFeignClient equipmentFeignClient;
@Override
public void analysis(AppAutoDataMessage appAutoDataMessage) {
@@ -73,12 +76,14 @@ public class RtServiceImpl implements IRtService {
}
//根据dataArray解析数据
AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0);
//获取设备类型
CsEquipmentDeliveryPO po1 =equipmentFeignClient.getDevByLineId(lineId).getData();
//fixme 这边先根据数据集的名称来返回对应实体,这边感觉不太合适,后期有好方案再调整
//基础数据
if (dataSet.getName().contains("Ds$Pqd$Rt$Basic$")) {
//用户Id
String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString();
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType());
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
@@ -93,7 +98,7 @@ public class RtServiceImpl implements IRtService {
if (ObjectUtil.isNotNull(redisObject)) {
Set<String> userSet = redisSetUtil.convertToSet(redisObject);
userSet.forEach(userId->{
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType());
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
@@ -101,7 +106,7 @@ public class RtServiceImpl implements IRtService {
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec();
baseRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + userId, new Gson().toJson(baseRealDataSet), 1, false);
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false);
});
}
}
@@ -200,9 +205,13 @@ public class RtServiceImpl implements IRtService {
return dataMap;
}
public BaseRealDataSet assembleData(List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray dataArray,Integer conType) {
public BaseRealDataSet assembleData(List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray dataArray,Integer conType,String accessMethod) {
Map<String,Float> dataMap = getData(dataArrayList,dataArray);
return channelData(dataMap,conType);
if (Objects.equals("CLD",accessMethod)) {
return channelData(dataMap,conType);
} else {
return channelData2(dataMap);
}
}
public BaseRealDataSet channelData(Map<String,Float> map,Integer conType) {
@@ -311,6 +320,82 @@ public class RtServiceImpl implements IRtService {
return baseRealDataSet;
}
public BaseRealDataSet channelData2(Map<String,Float> map) {
BaseRealDataSet baseRealDataSet = new BaseRealDataSet();
//频率
baseRealDataSet.setFreq(map.get("Pq_FreqM"));
//频率偏差
baseRealDataSet.setFreqDev(map.get("Pq_FreqDevM"));
//判断监测点的接线方式,不同接线方式电压有效值取值不同
//星型-相电压 角形、V型-线电压
//电压有效值
baseRealDataSet.setVRmsA(map.get("Pq_RmsUA"));
baseRealDataSet.setVRmsB(map.get("Pq_RmsUB"));
baseRealDataSet.setVRmsC(map.get("Pq_RmsUC"));
//基波电压幅值
baseRealDataSet.setV1A(map.get("Pq_RmsFundUA"));
baseRealDataSet.setV1B(map.get("Pq_RmsFundUB"));
baseRealDataSet.setV1C(map.get("Pq_RmsFundUC"));
//电流有效值
baseRealDataSet.setIRmsA(map.get("Pq_RmsIA"));
baseRealDataSet.setIRmsB(map.get("Pq_RmsIB"));
baseRealDataSet.setIRmsC(map.get("Pq_RmsIC"));
//基波电流幅值
baseRealDataSet.setI1A(map.get("Pq_RmsFundIA"));
baseRealDataSet.setI1B(map.get("Pq_RmsFundIB"));
baseRealDataSet.setI1C(map.get("Pq_RmsFundIC"));
//电压偏差
baseRealDataSet.setVDevA(map.get("Pq_UDevA"));
baseRealDataSet.setVDevB(map.get("Pq_UDevB"));
baseRealDataSet.setVDevC(map.get("Pq_UDevC"));
//基波电压相位
baseRealDataSet.setV1AngA(map.get("Pq_FundUAngA"));
baseRealDataSet.setV1AngB(map.get("Pq_FundUAngB"));
baseRealDataSet.setV1AngC(map.get("Pq_FundUAngC"));
//基波电流相位
baseRealDataSet.setI1AngA(map.get("Pq_FundIAngA"));
baseRealDataSet.setI1AngB(map.get("Pq_FundIAngB"));
baseRealDataSet.setI1AngC(map.get("Pq_FundIAngC"));
//电压总谐波畸变率
baseRealDataSet.setVThdA(map.get("Pq_ThdUA"));
baseRealDataSet.setVThdB(map.get("Pq_ThdUB"));
baseRealDataSet.setVThdC(map.get("Pq_ThdUC"));
//电流总谐波畸变率
baseRealDataSet.setIThdA(map.get("Pq_ThdIA"));
baseRealDataSet.setIThdB(map.get("Pq_ThdIB"));
baseRealDataSet.setIThdC(map.get("Pq_ThdIC"));
//电压不平衡度
baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUM"));
//电流不平衡度
baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIM"));
//有功功率
baseRealDataSet.setPA(map.get("Pq_PA"));
baseRealDataSet.setPB(map.get("Pq_PB"));
baseRealDataSet.setPC(map.get("Pq_PC"));
baseRealDataSet.setPTot(map.get("Pq_TotPM"));
//无功功率
baseRealDataSet.setQA(map.get("Pq_QA"));
baseRealDataSet.setQB(map.get("Pq_QB"));
baseRealDataSet.setQC(map.get("Pq_QC"));
baseRealDataSet.setQTot(map.get("Pq_TotQM"));
//视在功率
baseRealDataSet.setSA(map.get("Pq_SA"));
baseRealDataSet.setSB(map.get("Pq_SB"));
baseRealDataSet.setSC(map.get("Pq_SC"));
baseRealDataSet.setSTot(map.get("Pq_TotSM"));
//视在功率因数
baseRealDataSet.setPfA(map.get("Pq_PFA"));
baseRealDataSet.setPfB(map.get("Pq_PFB"));
baseRealDataSet.setPfC(map.get("Pq_PFC"));
baseRealDataSet.setPfTot(map.get("Pq_TotPFM"));
//位移功率因数
baseRealDataSet.setDpfA(map.get("Pq_DFA"));
baseRealDataSet.setDpfB(map.get("Pq_DFB"));
baseRealDataSet.setDpfC(map.get("Pq_DFC"));
baseRealDataSet.setDpfTot(map.get("Pq_TotDFM"));
return baseRealDataSet;
}
public HarmRealDataSet harmData(List<CsDataArray> dataArrayList, AppAutoDataMessage.DataArray dataArray, String dataLevel, Double ct) {
HarmRealDataSet harmRealDataSet = new HarmRealDataSet();
List<HarmData> harmDataList = new ArrayList<>();