refactor(device): 重构设备监测点信息缓存逻辑并优化数据处理

- 引入 DeviceMessageFeignClient 替代原有的 Redis 缓存监测点信息逻辑
- 移除 CsDeviceServiceImpl 中重复的监测点信息缓存代码,统一调用远程服务
- 将 phase 默认值从 "M" 修改为 "T",更新所有相关数据处理逻辑
- 优化 RtServiceImpl 中 CT 和 PT 变比计算逻辑,支持双变比处理
- 更新实时数据分析中的相别标识符,从 "M" 改为 "T"
- 在 StatServiceImpl 中完善数据标签处理和质量标志设置逻辑
This commit is contained in:
xy
2026-05-20 14:16:36 +08:00
parent 6983cd39fe
commit 2eeabddf5c
6 changed files with 76 additions and 139 deletions

View File

@@ -78,6 +78,8 @@ public class RtServiceImpl implements IRtService {
AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0);
//获取设备类型
CsEquipmentDeliveryPO po1 =equipmentFeignClient.getDevByLineId(lineId).getData();
Float ct = po.getCtRatio().floatValue() / (po.getCt2Ratio() == null ? 1.0f:po.getCt2Ratio().floatValue());
Float pt = po.getPtRatio().floatValue() / (po.getPt2Ratio() == null ? 1.0f:po.getPt2Ratio().floatValue());
//fixme 这边先根据数据集的名称来返回对应实体,这边感觉不太合适,后期有好方案再调整
//基础数据
if (dataSet.getName().contains("Ds$Pqd$Rt$Basic$")) {
@@ -88,8 +90,8 @@ public class RtServiceImpl implements IRtService {
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
baseRealDataSet.setCt(po.getCtRatio().floatValue());
baseRealDataSet.setPt(pt);
baseRealDataSet.setCt(ct);
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec() - 8*3600;
baseRealDataSet.setDataTime(getTime(timestamp));
@@ -104,8 +106,8 @@ public class RtServiceImpl implements IRtService {
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
baseRealDataSet.setCt(po.getCtRatio().floatValue());
baseRealDataSet.setPt(pt);
baseRealDataSet.setCt(ct);
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec();
baseRealDataSet.setDataTime(getTime(timestamp));
@@ -123,8 +125,8 @@ public class RtServiceImpl implements IRtService {
HarmRealDataSet harmRealDataSet = harmData(dataArrayList,item,dataSet.getDataLevel(),po.getCtRatio());
harmRealDataSet.setUserId(userId);
harmRealDataSet.setLineId(lineId);
harmRealDataSet.setPt(po.getPtRatio().floatValue());
harmRealDataSet.setCt(po.getCtRatio().floatValue());
harmRealDataSet.setPt(pt);
harmRealDataSet.setCt(ct);
harmRealDataSet.setDataLevel(dataSet.getDataLevel());
if (ObjectUtil.isNotNull(po.getLineNo())) {
timestamp = item.getDataTimeSec();
@@ -223,9 +225,9 @@ public class RtServiceImpl implements IRtService {
public BaseRealDataSet channelData(Map<String,Float> map,Integer conType) {
BaseRealDataSet baseRealDataSet = new BaseRealDataSet();
//频率
baseRealDataSet.setFreq(map.get("Pq_FreqM"));
baseRealDataSet.setFreq(map.get("Pq_FreqT"));
//频率偏差
baseRealDataSet.setFreqDev(map.get("Pq_FreqDevM"));
baseRealDataSet.setFreqDev(map.get("Pq_FreqDevT"));
//判断监测点的接线方式,不同接线方式电压有效值取值不同
//星型-相电压 角形、V型-线电压
//电压有效值
@@ -295,43 +297,43 @@ public class RtServiceImpl implements IRtService {
baseRealDataSet.setIThdB(map.get("Pq_ThdIB"));
baseRealDataSet.setIThdC(map.get("Pq_ThdIC"));
//电压不平衡度
baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUM"));
baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUT"));
//电流不平衡度
baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIM"));
baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIT"));
//有功功率
baseRealDataSet.setPA(map.get("Pq_PA"));
baseRealDataSet.setPB(map.get("Pq_PB"));
baseRealDataSet.setPC(map.get("Pq_PC"));
baseRealDataSet.setPTot(map.get("Pq_TotPM"));
baseRealDataSet.setPTot(map.get("Pq_TotPT"));
//无功功率
baseRealDataSet.setQA(map.get("Pq_QA"));
baseRealDataSet.setQB(map.get("Pq_QB"));
baseRealDataSet.setQC(map.get("Pq_QC"));
baseRealDataSet.setQTot(map.get("Pq_TotQM"));
baseRealDataSet.setQTot(map.get("Pq_TotQT"));
//视在功率
baseRealDataSet.setSA(map.get("Pq_SA"));
baseRealDataSet.setSB(map.get("Pq_SB"));
baseRealDataSet.setSC(map.get("Pq_SC"));
baseRealDataSet.setSTot(map.get("Pq_TotSM"));
baseRealDataSet.setSTot(map.get("Pq_TotST"));
//视在功率因数
baseRealDataSet.setPfA(map.get("Pq_PFA"));
baseRealDataSet.setPfB(map.get("Pq_PFB"));
baseRealDataSet.setPfC(map.get("Pq_PFC"));
baseRealDataSet.setPfTot(map.get("Pq_TotPFM"));
baseRealDataSet.setPfTot(map.get("Pq_TotPFT"));
//位移功率因数
baseRealDataSet.setDpfA(map.get("Pq_DFA"));
baseRealDataSet.setDpfB(map.get("Pq_DFB"));
baseRealDataSet.setDpfC(map.get("Pq_DFC"));
baseRealDataSet.setDpfTot(map.get("Pq_TotDFM"));
baseRealDataSet.setDpfTot(map.get("Pq_TotDFT"));
return baseRealDataSet;
}
public BaseRealDataSet channelData2(Map<String,Float> map) {
BaseRealDataSet baseRealDataSet = new BaseRealDataSet();
//频率
baseRealDataSet.setFreq(map.get("Pq_FreqM"));
baseRealDataSet.setFreq(map.get("Pq_FreqT"));
//频率偏差
baseRealDataSet.setFreqDev(map.get("Pq_FreqDevM"));
baseRealDataSet.setFreqDev(map.get("Pq_FreqDevT"));
//判断监测点的接线方式,不同接线方式电压有效值取值不同
//星型-相电压 角形、V型-线电压
//电压有效值
@@ -371,34 +373,34 @@ public class RtServiceImpl implements IRtService {
baseRealDataSet.setIThdB(map.get("Pq_ThdIB"));
baseRealDataSet.setIThdC(map.get("Pq_ThdIC"));
//电压不平衡度
baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUM"));
baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUT"));
//电流不平衡度
baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIM"));
baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIT"));
//有功功率
baseRealDataSet.setPA(map.get("Pq_PA"));
baseRealDataSet.setPB(map.get("Pq_PB"));
baseRealDataSet.setPC(map.get("Pq_PC"));
baseRealDataSet.setPTot(map.get("Pq_TotPM"));
baseRealDataSet.setPTot(map.get("Pq_TotPT"));
//无功功率
baseRealDataSet.setQA(map.get("Pq_QA"));
baseRealDataSet.setQB(map.get("Pq_QB"));
baseRealDataSet.setQC(map.get("Pq_QC"));
baseRealDataSet.setQTot(map.get("Pq_TotQM"));
baseRealDataSet.setQTot(map.get("Pq_TotQT"));
//视在功率
baseRealDataSet.setSA(map.get("Pq_SA"));
baseRealDataSet.setSB(map.get("Pq_SB"));
baseRealDataSet.setSC(map.get("Pq_SC"));
baseRealDataSet.setSTot(map.get("Pq_TotSM"));
baseRealDataSet.setSTot(map.get("Pq_TotST"));
//视在功率因数
baseRealDataSet.setPfA(map.get("Pq_PFA"));
baseRealDataSet.setPfB(map.get("Pq_PFB"));
baseRealDataSet.setPfC(map.get("Pq_PFC"));
baseRealDataSet.setPfTot(map.get("Pq_TotPFM"));
baseRealDataSet.setPfTot(map.get("Pq_TotPFT"));
//位移功率因数
baseRealDataSet.setDpfA(map.get("Pq_DFA"));
baseRealDataSet.setDpfB(map.get("Pq_DFB"));
baseRealDataSet.setDpfC(map.get("Pq_DFC"));
baseRealDataSet.setDpfTot(map.get("Pq_TotDFM"));
baseRealDataSet.setDpfTot(map.get("Pq_TotDFT"));
return baseRealDataSet;
}

View File

@@ -11,6 +11,7 @@ import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsCommunicateFeignClient;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
@@ -61,6 +62,7 @@ public class StatServiceImpl implements IStatService {
private final CsDeviceFeignClient csDeviceFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private final CsCommunicateFeignClient csCommunicateFeignClient;
private final CsLineFeignClient csLineFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -82,7 +84,7 @@ public class StatServiceImpl implements IStatService {
String lineId = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId());
if (Objects.isNull(object1)){
deviceMessageFeignClient.getLineInfo(appAutoDataMessage.getId());
deviceMessageFeignClient.getLineInfo(appAutoDataMessage.getId(),null);
}
//获取当前设备信息判断装置型号,来筛选监测点
List<CsEquipmentDeliveryPO> poList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DEVICE_LIST),CsEquipmentDeliveryPO.class);
@@ -109,6 +111,7 @@ public class StatServiceImpl implements IStatService {
//获取当前设备信息
if (CollectionUtil.isNotEmpty(list)) {
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
List<String> recordList = new ArrayList<>();
for (AppAutoDataMessage.DataArray item : list) {
switch (item.getDataAttr()) {
@@ -141,7 +144,7 @@ public class StatServiceImpl implements IStatService {
} else {
dataArrayList = objectToList(object);
}
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code,po.getDevAccessMethod());
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code,po.getDevAccessMethod(),map);
recordList.addAll(result);
//获取时间
boolean timeFlag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
@@ -190,7 +193,7 @@ public class StatServiceImpl implements IStatService {
/**
* influxDB数据组装
*/
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) {
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod, Map<String,String> map) {
List<String> records = new ArrayList<String>();
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
if (CollectionUtil.isEmpty(floats)){
@@ -199,14 +202,10 @@ public class StatServiceImpl implements IStatService {
if (!Objects.equals(dataArrayList.size(),floats.size())){
throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH);
}
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
//fixme 捂脸设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
long originalTimeSec = flag ? item.getDataTimeSec() : item.getDataTimeSec() - 8 * 3600;
if (originalTimeSec < 0) {
System.out.println("originalTimeSec==:" + originalTimeSec);
}
for (int i = 0; i < dataArrayList.size(); i++) {
String tableName = map.get(dataArrayList.get(i).getName());
@@ -227,7 +226,11 @@ public class StatServiceImpl implements IStatService {
tags.put(InfluxDBTableConstant.LINE_ID,lineId);
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod.toUpperCase());
tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0");
if (Objects.isNull(item.getDataTag())) {
tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0");
} else {
tags.put(InfluxDBTableConstant.QUALITY_FLAG,String.valueOf(item.getDataTag()));
}
Map<String,Object> fields = new HashMap<>();
//这边特殊处理如果数据为3.14159则将数据置为null
if (Objects.isNull(dataArrayList.get(i).getInfluxDbName())) {
@@ -237,7 +240,6 @@ public class StatServiceImpl implements IStatService {
}
fields.put(InfluxDBTableConstant.CL_DID,clDid.toString());
fields.put(InfluxDBTableConstant.PROCESS,process.toString());
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
Point point = influxDbUtils.pointBuilder(tableName, adjustedTimeSec, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
@@ -258,4 +260,5 @@ public class StatServiceImpl implements IStatService {
}
return urlList;
}
}

View File

@@ -51,7 +51,7 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
List<WaveTimeDto> list = new ArrayList<>();
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
if (Objects.isNull(object1)){
deviceMessageFeignClient.getLineInfo(appEventMessage.getId());
deviceMessageFeignClient.getLineInfo(appEventMessage.getId(),null);
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();

View File

@@ -96,7 +96,7 @@ public class EventServiceImpl implements IEventService {
}
//判断监测点是否存在
if (Objects.isNull(object1)){
deviceMessageFeignClient.getLineInfo(appEventMessage.getId());
deviceMessageFeignClient.getLineInfo(appEventMessage.getId(),null);
}
//获取装置id
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData();