fix(data): 解决实时数据处理中的空指针异常和数据写入问题

- 在基础数据处理中添加redis对象空值检查,避免空指针异常
- 在谐波数据处理中添加redis对象空值检查,确保数据安全处理
- 将统计方法名称转换为大写以确保数据一致性
- 根据influxDbName字段决定数据字段名称,提高数据映射准确性
- 移除已注释的废弃代码块,清理代码结构
This commit is contained in:
xy
2026-05-18 18:36:42 +08:00
parent 15f84c1bc0
commit 6983cd39fe
2 changed files with 37 additions and 64 deletions

View File

@@ -82,16 +82,19 @@ public class RtServiceImpl implements IRtService {
//基础数据
if (dataSet.getName().contains("Ds$Pqd$Rt$Basic$")) {
//用户Id
String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString();
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
baseRealDataSet.setCt(po.getCtRatio().floatValue());
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec() - 8*3600;
baseRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false);
Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId);
if (ObjectUtil.isNotNull(redisObject)) {
String userId = redisObject.toString();
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
baseRealDataSet.setCt(po.getCtRatio().floatValue());
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec() - 8*3600;
baseRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false);
}
} else if (dataSet.getName().contains("实时数据") || dataSet.getName().contains("Ds$Pqd$Rt$01")) {
//用户Id
Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId);
@@ -114,20 +117,23 @@ public class RtServiceImpl implements IRtService {
else {
long timestamp;
//用户Id
String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString();
HarmRealDataSet harmRealDataSet = harmData(dataArrayList,item,dataSet.getDataLevel(),po.getCtRatio());
harmRealDataSet.setUserId(userId);
harmRealDataSet.setLineId(lineId);
harmRealDataSet.setPt(po.getPtRatio().floatValue());
harmRealDataSet.setCt(po.getCtRatio().floatValue());
harmRealDataSet.setDataLevel(dataSet.getDataLevel());
if (ObjectUtil.isNotNull(po.getLineNo())) {
timestamp = item.getDataTimeSec();
} else {
timestamp = item.getDataTimeSec() - 8*3600;
Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId);
if (ObjectUtil.isNotNull(redisObject)) {
String userId = redisObject.toString();
HarmRealDataSet harmRealDataSet = harmData(dataArrayList,item,dataSet.getDataLevel(),po.getCtRatio());
harmRealDataSet.setUserId(userId);
harmRealDataSet.setLineId(lineId);
harmRealDataSet.setPt(po.getPtRatio().floatValue());
harmRealDataSet.setCt(po.getCtRatio().floatValue());
harmRealDataSet.setDataLevel(dataSet.getDataLevel());
if (ObjectUtil.isNotNull(po.getLineNo())) {
timestamp = item.getDataTimeSec();
} else {
timestamp = item.getDataTimeSec() - 8*3600;
}
harmRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(harmRealDataSet), 1, false);
}
harmRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(harmRealDataSet), 1, false);
}
}

View File

@@ -226,13 +226,17 @@ public class StatServiceImpl implements IStatService {
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.LINE_ID,lineId);
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod);
tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
tags.put(InfluxDBTableConstant.PROCESS,process.toString());
tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod.toUpperCase());
tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0");
Map<String,Object> fields = new HashMap<>();
//这边特殊处理如果数据为3.14159则将数据置为null
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
if (Objects.isNull(dataArrayList.get(i).getInfluxDbName())) {
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
} else {
fields.put(dataArrayList.get(i).getInfluxDbName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
}
fields.put(InfluxDBTableConstant.CL_DID,clDid.toString());
fields.put(InfluxDBTableConstant.PROCESS,process.toString());
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
Point point = influxDbUtils.pointBuilder(tableName, adjustedTimeSec, TimeUnit.SECONDS, tags, fields);
@@ -243,43 +247,6 @@ public class StatServiceImpl implements IStatService {
return records;
}
// public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) {
// List<String> records = new ArrayList<String>();
// //解码
// List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
// if (CollectionUtil.isEmpty(floats)){
// throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL);
// }
// //校验模板和解码数据数量能否对应上
// if (!Objects.equals(dataArrayList.size(),floats.size())){
// throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH);
// }
// Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
// for (int i = 0; i < dataArrayList.size(); i++) {
// String tableName = map.get(dataArrayList.get(i).getName());
// Map<String, String> tags = new HashMap<>();
// tags.put(InfluxDBTableConstant.LINE_ID,lineId);
// tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
// //todo 不清楚之前为啥要修改相别,这边按字典配置相别无法查询到数据,先改回来
// //tags.put(InfluxDBTableConstant.PHASIC_TYPE,Objects.isNull(PHASE_MAPPING.get(dataArrayList.get(i).getPhase()))?dataArrayList.get(i).getPhase():PHASE_MAPPING.get(dataArrayList.get(i).getPhase()));
// tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod);
// tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
// tags.put(InfluxDBTableConstant.PROCESS,process.toString());
// tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0");
// Map<String,Object> fields = new HashMap<>();
// //这边特殊处理如果数据为3.14159则将数据置为null
// fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
// fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
// //fixme 设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
// boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
// Point point = influxDbUtils.pointBuilder(tableName, flag?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
// BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
// batchPoints.point(point);
// records.add(batchPoints.lineProtocol());
// }
// return records;
// }
public List<CsDataArray> objectToList(Object object) {
List<CsDataArray> urlList = new ArrayList<>();
if (object != null) {