新增数据清洗算法(文件方式存储)

This commit is contained in:
xy
2025-06-05 17:41:13 +08:00
parent 56aa40211d
commit 6179e7f434
12 changed files with 941 additions and 24 deletions

View File

@@ -51,6 +51,20 @@ public class MeasurementExecutor extends BaseExecutor {
dataCleanService.dataQualityCleanHandler(bindCmp.getRequestData());
}
/**
* 数据清洗
* @author xy
*/
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataClean", nodeType = NodeTypeEnum.COMMON)
public boolean dataCleanAccess(NodeComponent bindCmp) {
return isAccess(bindCmp);
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataClean", nodeType = NodeTypeEnum.COMMON)
public void dataCleanProcess(NodeComponent bindCmp) {
dataCleanService.dataCleanHandler(bindCmp.getRequestData());
}
/**
* 数据清洗 电压表
* dataV表

View File

@@ -15,6 +15,12 @@ public interface IDataCleanService {
*/
void dataQualityCleanHandler(CalculatedParam calculatedParam);
/**
* 原始数据表统一清洗
* @param calculatedParam
*/
void dataCleanHandler(CalculatedParam calculatedParam);
/***
* dataV数据清洗
* 不标记原始数据,将异常数据查询出来重新存储至详情表中

View File

@@ -1,22 +1,30 @@
package com.njcn.algorithm.serviceimpl.line;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.IDataCleanService;
import com.njcn.algorithm.utils.MemorySizeUtil;
import com.njcn.dataProcess.api.*;
import com.njcn.dataProcess.dto.DataCleanJsonDTO;
import com.njcn.dataProcess.dto.RmpEventDetailDTO;
import com.njcn.dataProcess.enums.DataCleanEnum;
import com.njcn.dataProcess.param.DataCleanParam;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.*;
import com.njcn.dataProcess.pojo.po.PqDataVerify;
import com.njcn.dataProcess.pojo.po.PqDataVerifyBak;
import com.njcn.dataProcess.util.DataCommonUtils;
import com.njcn.dataProcess.util.TimeUtils;
import com.njcn.device.pq.api.LineFeignClient;
import com.njcn.device.pq.pojo.vo.LineDetailDataVO;
import com.njcn.device.pq.pojo.vo.LineDetailVO;
import com.njcn.oss.constant.OssPath;
import com.njcn.oss.utils.FileStorageUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
@@ -28,8 +36,10 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@@ -88,7 +98,10 @@ public class DataCleanServiceImpl implements IDataCleanService {
private RmpEventDetailFeignClient rmpEventDetailFeignClient;
@Autowired
private DataHarmphasicIFeignClient dataHarmphasicIFeignClient;
@Resource
private PqDataVerifyNewFeignClient pqDataVerifyNewFeignClient;
@Resource
private FileStorageUtil fileStorageUtil;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -279,10 +292,450 @@ public class DataCleanServiceImpl implements IDataCleanService {
}
}
@Override
public void dataCleanHandler(CalculatedParam calculatedParam) {
MemorySizeUtil.getNowMemory();
logger.info("{},原始表数据清洗=====》", LocalDateTime.now());
//获取标准
Map<String, List<PqReasonableRangeDto>> map = getStandardData();
//获取监测点台账信息
List<String> lineList = calculatedParam.getIdList();
List<LineDetailVO.Detail> lineDetail = lineFeignClient.getLineDetailByIds(lineList).getData();
if (CollUtil.isEmpty(lineDetail)) {
logger.error("监测点集合为空,无法计算!");
return;
}
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
lineDetail.forEach(item->{
List<Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>>> resultData = new ArrayList<>();
PqDataVerifyBak bak = new PqDataVerifyBak();
bak.setLineId(item.getLineId());
bak.setTime(LocalDate.parse(calculatedParam.getDataDate(), DateTimeFormatter.ofPattern(DatePattern.NORM_DATE_PATTERN)));
lineParam.setLineId(Collections.singletonList(item.getLineId()));
//dataV
List<DataVDto> data1 = dataVFeignClient.getRawData(lineParam).getData();
//dataI
List<DataIDto> data2 = dataIFeignClient.getRawData(lineParam).getData();
//dataPlt
List<DataPltDto> data3 = dataPltFeignClient.getRawData(lineParam).getData();
//dataInHarmV
List<DataHarmDto> data4 = dataInharmVFeignClient.getRawData(lineParam).getData();
//dataHarmRateV
List<DataHarmDto> data5 = dataHarmRateVFeignClient.getRawData(lineParam).getData();
//dataHarmPowerP
List<DataPowerPDto> data6 = dataHarmpowerPFeignClient.getRawData(lineParam).getData();
//dataHarmPhasicV
List<DataHarmDto> data7 = dataHarmphasicVFeignClient.getRawData(lineParam).getData();
//dataFluc
List<DataFlucDto> data8 = dataFlucFeignClient.getRawData(lineParam).getData();
//dataFlicker
List<DataFlickerDto> data9 = dataFlickerFeignClient.getRawData(lineParam).getData();
if (CollUtil.isNotEmpty(data1)) {
logger.info("{}数据清洗dataV集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data1));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataV.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data1.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataV(pqReasonableRangeDtoMap,item,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleDataV(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data2)) {
logger.info("{}数据清洗dataI集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data2));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataI.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data2.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataI(pqReasonableRangeDtoMap,item,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleDataI(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data3)) {
logger.info("{}数据清洗dataPlt集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data3));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataPlt.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data3.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataPlt(pqReasonableRangeDtoMap,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleDataPlt(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data4)) {
logger.info("{}数据清洗dataInHarmV集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data4));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataInHarmV.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data4.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataInHarmV(pqReasonableRangeDtoMap,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleInHarmV(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data5)) {
logger.info("{}数据清洗dataHarmRateV集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data5));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataHarmRateV.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data5.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataHarmRateV(pqReasonableRangeDtoMap,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleHarmRateV(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data6)) {
logger.info("{}数据清洗dataHarmPowerP集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data6));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataHarmPowerP.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data6.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataHarmPowerP(pqReasonableRangeDtoMap,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleHarmPowerP(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data7)) {
logger.info("{}数据清洗dataHarmPhasicV集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data7));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataHarmPhasicV.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data7.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataHarmPhasicV(pqReasonableRangeDtoMap,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleHarmPhasicV(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data8)) {
logger.info("{}数据清洗dataFluc集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data8));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataFluc.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data8.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataFluc(pqReasonableRangeDtoMap,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleDataFluc(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(data9)) {
logger.info("{}数据清洗dataFlicker集合大小为>>>>>>>>>>>>{}",lineParam.getStartTime(), MemorySizeUtil.getObjectSize(data9));
List<PqDataVerify> result = new ArrayList<>();
List<PqReasonableRangeDto> list = map.get(DataCleanEnum.DataFlicker.getCode());
Map<String, PqReasonableRangeDto> pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
data9.forEach(data->{
List<PqDataVerify> pqDataVerifies = judgeDataFlicker(pqReasonableRangeDtoMap,data);
result.addAll(pqDataVerifies);
});
if (CollUtil.isNotEmpty(result)) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> dtoMap = handleDataFlicker(result,bak);
if (CollUtil.isNotEmpty(dtoMap)) {
resultData.add(dtoMap);
}
}
}
if (CollUtil.isNotEmpty(resultData)) {
//存储文件
InputStream reportStream = IoUtil.toStream(new Gson().toJson(resultData), CharsetUtil.UTF_8);
String fileName = fileStorageUtil.uploadStreamSpecifyName(
reportStream
, OssPath.DATA_CLEAN + calculatedParam.getDataDate() + "/"
,item.getLineId() + ".txt");
//存储数据
bak.setFilePath(fileName);
}
pqDataVerifyNewFeignClient.insertData(bak);
});
System.gc();
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleDataV(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//频率
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list1 = handleData(codeMap.get(DataCleanEnum.Freq.getCode()));
if (CollUtil.isNotEmpty(list1)) {
bak.setFreq(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.Freq.getCode(),list1);
}
//频率偏差
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list2 = handleData(codeMap.get(DataCleanEnum.FreqDev.getCode()));
if (CollUtil.isNotEmpty(list2)) {
bak.setFreqDev(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.FreqDev.getCode(),list2);
}
//相电压有效值
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list3 = handleData(codeMap.get(DataCleanEnum.RmsV.getCode()));
if (CollUtil.isNotEmpty(list3)) {
bak.setVRms(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.RmsV.getCode(),list3);
}
//正序电压
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list4 = handleData(codeMap.get(DataCleanEnum.VPos.getCode()));
if (CollUtil.isNotEmpty(list4)) {
bak.setVPos(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.VPos.getCode(),list4);
}
//负序电压
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list5 = handleData(codeMap.get(DataCleanEnum.VNeg.getCode()));
if (CollUtil.isNotEmpty(list5)) {
bak.setVNeg(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.VNeg.getCode(),list5);
}
//零序电压
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list6 = handleData(codeMap.get(DataCleanEnum.VZero.getCode()));
if (CollUtil.isNotEmpty(list6)) {
bak.setVZero(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.VZero.getCode(),list6);
}
//电压不平衡度
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list7 = handleData(codeMap.get(DataCleanEnum.VUnbalance.getCode()));
if (CollUtil.isNotEmpty(list7)) {
bak.setVUnbalance(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.VUnbalance.getCode(),list7);
}
//线电压有效值
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list8 = handleData(codeMap.get(DataCleanEnum.RmsLvr.getCode()));
if (CollUtil.isNotEmpty(list8)) {
bak.setRmsLvr(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.RmsLvr.getCode(),list8);
}
//电压正偏差
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list9 = handleData(codeMap.get(DataCleanEnum.VuDev.getCode()));
if (CollUtil.isNotEmpty(list9)) {
bak.setVuDev(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.VuDev.getCode(),list9);
}
//电压负偏差
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list10 = handleData(codeMap.get(DataCleanEnum.VlDev.getCode()));
if (CollUtil.isNotEmpty(list10)) {
bak.setVlDev(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.VlDev.getCode(),list10);
}
//电压总谐波畸变率
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list11 = handleData(codeMap.get(DataCleanEnum.VThd.getCode()));
if (CollUtil.isNotEmpty(list11)) {
bak.setVThd(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.VThd.getCode(),list11);
}
//相(线)电压基波有效值
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list12 = handleData(codeMap.get(DataCleanEnum.V_Data.getCode()));
if (CollUtil.isNotEmpty(list12)) {
bak.setV(1);
map.put(DataCleanEnum.DataV.getCode() + "-" + DataCleanEnum.V_Data.getCode(),list12);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleDataI(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//电流有效值
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleData(codeMap.get(DataCleanEnum.RmsI.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setIRms(1);
map.put(DataCleanEnum.DataI.getCode() + "-" + DataCleanEnum.RmsI.getCode(),list);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleDataPlt(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//长时闪变
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleData(codeMap.get(DataCleanEnum.Plt.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setPlt(1);
map.put(DataCleanEnum.DataPlt.getCode() + "-" + DataCleanEnum.Plt.getCode(),list);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleInHarmV(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//间谐波电压含有率
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleHarmData(codeMap.get(DataCleanEnum.V_InHarm.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setVInharm(1);
map.put(DataCleanEnum.DataInHarmV.getCode() + "-" + DataCleanEnum.V_InHarm.getCode(),list);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleHarmRateV(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//谐波电压含有率
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleHarmData(codeMap.get(DataCleanEnum.V_Rate.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setVHarm(1);
map.put(DataCleanEnum.DataHarmRateV.getCode() + "-" + DataCleanEnum.V_Rate.getCode(),list);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleHarmPowerP(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//功率因素
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleData(codeMap.get(DataCleanEnum.Pf.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setPf(1);
map.put(DataCleanEnum.DataHarmPowerP.getCode() + "-" + DataCleanEnum.Pf.getCode(),list);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleHarmPhasicV(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//谐波电压相角
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleHarmData(codeMap.get(DataCleanEnum.V.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setVPhasic(1);
map.put(DataCleanEnum.DataHarmPhasicV.getCode() + "-" + DataCleanEnum.V.getCode(),list);
}
//谐波电压基波相角
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list2 = handleData(codeMap.get(DataCleanEnum.V1.getCode()));
if (CollUtil.isNotEmpty(list2)) {
bak.setV1Phasic(1);
map.put(DataCleanEnum.DataHarmPhasicV.getCode() + "-" + DataCleanEnum.V1.getCode(),list2);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleDataFluc(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//电压波动
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleData(codeMap.get(DataCleanEnum.Fluc.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setFluc(1);
map.put(DataCleanEnum.DataFluc.getCode() + "-" + DataCleanEnum.Fluc.getCode(),list);
}
return map;
}
public Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> handleDataFlicker(List<PqDataVerify> result, PqDataVerifyBak bak) {
Map<String,List<DataCleanJsonDTO.DataHarmCleanJsonDTO>> map = new HashMap<>();
Map<String,List<PqDataVerify>> codeMap = result.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexCode));
//短时闪变
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> list = handleData(codeMap.get(DataCleanEnum.Pst.getCode()));
if (CollUtil.isNotEmpty(list)) {
bak.setPst(1);
map.put(DataCleanEnum.DataFlicker.getCode() + "-" + DataCleanEnum.Pst.getCode(),list);
}
return map;
}
public List<DataCleanJsonDTO.DataHarmCleanJsonDTO> handleData(List<PqDataVerify> list) {
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> dataList = new ArrayList<>();
if (CollUtil.isNotEmpty(list)) {
DataCleanJsonDTO.DataHarmCleanJsonDTO dto = new DataCleanJsonDTO.DataHarmCleanJsonDTO();
dto.setTargetName(list.get(0).getIndexName());
List<DataCleanJsonDTO.DataHarmCleanNormalJsonDTO> list1 = new ArrayList<>();
Map<String,List<PqDataVerify>> phaseMap = list.stream().collect(Collectors.groupingBy(PqDataVerify::getPhasicType));
phaseMap.forEach((k,v)->{
Map<String, List<PqDataVerify>> map11 = v.stream().collect(Collectors.groupingBy(PqDataVerify::getValueType));
map11.forEach((k2,v2)->{
DataCleanJsonDTO.DataHarmCleanNormalJsonDTO dataCleanJsonDTO = new DataCleanJsonDTO.DataHarmCleanNormalJsonDTO();
dataCleanJsonDTO.setPhasic(k);
dataCleanJsonDTO.setValueType(k2);
dataCleanJsonDTO.setTime(v2.stream().map(dt -> dt.getTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_TIME_PATTERN))).collect(Collectors.toList()));
dataCleanJsonDTO.setValue(v2.stream().map(PqDataVerify::getAbnormalValue).collect(Collectors.toList()));
list1.add(dataCleanJsonDTO);
});
dto.setList(list1);
});
dataList.add(dto);
}
return dataList;
}
public List<DataCleanJsonDTO.DataHarmCleanJsonDTO> handleHarmData(List<PqDataVerify> list) {
List<DataCleanJsonDTO.DataHarmCleanJsonDTO> dataList = new ArrayList<>();
if (CollUtil.isNotEmpty(list)) {
Map<String,List<PqDataVerify>> indexNameMap = list.stream().collect(Collectors.groupingBy(PqDataVerify::getIndexName));
indexNameMap.forEach((k,v)->{
DataCleanJsonDTO.DataHarmCleanJsonDTO dto = new DataCleanJsonDTO.DataHarmCleanJsonDTO();
dto.setTargetName(k);
List<DataCleanJsonDTO.DataHarmCleanNormalJsonDTO> dataCleanJsonDTOList = new ArrayList<>();
Map<String,List<PqDataVerify>> phaseMap = v.stream().collect(Collectors.groupingBy(PqDataVerify::getPhasicType));
phaseMap.forEach((k2,v2)->{
Map<String, List<PqDataVerify>> valueMap = v2.stream().collect(Collectors.groupingBy(PqDataVerify::getValueType));
valueMap.forEach((k3,v3)->{
DataCleanJsonDTO.DataHarmCleanNormalJsonDTO dto1 = new DataCleanJsonDTO.DataHarmCleanNormalJsonDTO();
dto1.setPhasic(k2);
dto1.setValueType(k3);
dto1.setTime(v3.stream().map(dt -> dt.getTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_TIME_PATTERN))).collect(Collectors.toList()));
dto1.setValue(v3.stream().map(PqDataVerify::getAbnormalValue).collect(Collectors.toList()));
dataCleanJsonDTOList.add(dto1);
});
dto.setList(dataCleanJsonDTOList);
});
dataList.add(dto);
});
}
return dataList;
}
@Override
public void dataVCleanHandler(CalculatedParam calculatedParam) {
MemorySizeUtil.getNowMemory();
logger.info("{},dataV表异常数据算法执行=====》", LocalDateTime.now());
List<PqDataVerify> result = new ArrayList<>();
//获取标准
Map<String, PqReasonableRangeDto> map = getStandardData(DataCleanEnum.DataV.getCode());
@@ -607,6 +1060,20 @@ public class DataCleanServiceImpl implements IDataCleanService {
System.gc();
}
/**
* 获取标准数据的范围
*/
public Map<String, List<PqReasonableRangeDto>> getStandardData() {
Map<String, List<PqReasonableRangeDto>> pqReasonableRangeDtoMap = new HashMap<>();
DataCleanParam param = new DataCleanParam();
param.setSystemType(DataCleanEnum.Pqs.getCode());
param.setDataSource(DataCleanEnum.InfluxDB.getCode());
List<PqReasonableRangeDto> list = pqReasonableRangeFeignClient.getData(param).getData();
if (CollUtil.isNotEmpty(list)) {
pqReasonableRangeDtoMap = list.stream().collect(Collectors.groupingBy(PqReasonableRangeDto::getInfluxdbTableName));
}
return pqReasonableRangeDtoMap;
}
/**
* 获取标准数据的范围
@@ -616,7 +1083,9 @@ public class DataCleanServiceImpl implements IDataCleanService {
DataCleanParam param = new DataCleanParam();
param.setSystemType(DataCleanEnum.Pqs.getCode());
param.setDataSource(DataCleanEnum.InfluxDB.getCode());
param.setTableName(tableName);
if (ObjectUtil.isNotNull(tableName)) {
param.setTableName(tableName);
}
List<PqReasonableRangeDto> list = pqReasonableRangeFeignClient.getData(param).getData();
if (CollUtil.isNotEmpty(list)) {
pqReasonableRangeDtoMap = list.stream().collect(Collectors.toMap(PqReasonableRangeDto::getIndexCode, Function.identity()));
@@ -694,25 +1163,25 @@ public class DataCleanServiceImpl implements IDataCleanService {
}
//正序电压
// pqReasonableRangeDto = map.get(DataCleanEnum.VPos.getCode());
// phaseList = Arrays.asList(pqReasonableRangeDto.getPhaseType().split(","));
// if (phaseList.contains(dto.getPhasicType())) {
// if (dto.getVPos() < (pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))
// || dto.getVPos() > (pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))) {
// //log.info("dataV-正序电压数据异常,已清洗!数据值:{},数据时间:{}", dto.getVPos(), dto.getMinTime());
// PqDataVerify pqDataVerify = getPqDataVerify(dto.getLineId()
// ,dto.getMinTime()
// ,dto.getValueType()
// ,dto.getPhasicType()
// ,pqReasonableRangeDto.getIndexCode()
// ,pqReasonableRangeDto.getIndexName()
// ,pqReasonableRangeDto.getInfluxdbTableName()
// ,dto.getVPos()
// ,pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel())
// ,pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()));
// list.add(pqDataVerify);
// }
// }
pqReasonableRangeDto = map.get(DataCleanEnum.VPos.getCode());
phaseList = Arrays.asList(pqReasonableRangeDto.getPhaseType().split(","));
if (phaseList.contains(dto.getPhasicType())) {
if (dto.getVPos() < (pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))
|| dto.getVPos() > (pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))) {
//log.info("dataV-正序电压数据异常,已清洗!数据值:{},数据时间:{}", dto.getVPos(), dto.getMinTime());
PqDataVerify pqDataVerify = getPqDataVerify(dto.getLineId()
,dto.getMinTime()
,dto.getValueType()
,dto.getPhasicType()
,pqReasonableRangeDto.getIndexCode()
,pqReasonableRangeDto.getIndexName()
,pqReasonableRangeDto.getInfluxdbTableName()
,dto.getVPos()
,pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel())
,pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()));
list.add(pqDataVerify);
}
}
//负序电压
pqReasonableRangeDto = map.get(DataCleanEnum.VNeg.getCode());

View File

@@ -0,0 +1,27 @@
package com.njcn.dataProcess.api;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.dataProcess.api.fallback.PqDataVerifyNewFeignClientFallbackFactory;
import com.njcn.dataProcess.pojo.po.PqDataVerifyBak;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
/**
* @author xy
* @version 1.0.0
* @date 2025年02月13日 20:11
*/
@FeignClient(value = ServerInfo.PLATFORM_DATA_PROCESSING_BOOT, path = "/pqDataVerifyNew", fallbackFactory = PqDataVerifyNewFeignClientFallbackFactory.class, contextId = "pqDataVerifyNew")
public interface PqDataVerifyNewFeignClient {
@PostMapping("/insertDataBatch")
HttpResult<List<String>> insertDataBatch(@RequestBody List<PqDataVerifyBak> list);
@PostMapping("/insertData")
HttpResult<List<String>> insertData(@RequestBody PqDataVerifyBak pqDataVerifyBak);
}

View File

@@ -0,0 +1,52 @@
package com.njcn.dataProcess.api.fallback;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.dataProcess.api.PqDataVerifyNewFeignClient;
import com.njcn.dataProcess.pojo.po.PqDataVerifyBak;
import com.njcn.dataProcess.util.DataProcessingEnumUtil;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author xy
* @version 1.0.0
* @date 2025年02月13日 20:13
*/
@Slf4j
@Component
public class PqDataVerifyNewFeignClientFallbackFactory implements FallbackFactory<PqDataVerifyNewFeignClient> {
/**
* 输出远程请求接口异常日志
* @param cause RPC请求异常
*/
@Override
public PqDataVerifyNewFeignClient create(Throwable cause) {
//判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if(cause.getCause() instanceof BusinessException){
BusinessException businessException = (BusinessException) cause.getCause();
exceptionEnum = DataProcessingEnumUtil.getExceptionEnum(businessException.getResult());
}
Enum<?> finalExceptionEnum = exceptionEnum;
return new PqDataVerifyNewFeignClient() {
@Override
public HttpResult<List<String>> insertDataBatch(List<PqDataVerifyBak> list) {
log.error("{}异常,降级处理,异常为:{}","批量存储清洗的异常数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<List<String>> insertData(PqDataVerifyBak pqDataVerifyBak) {
log.error("{}异常,降级处理,异常为:{}","单监测点存储清洗的异常数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -0,0 +1,42 @@
package com.njcn.dataProcess.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
import java.util.Map;
/**
* @author xy
*/
@Data
public class DataCleanJsonDTO {
Map<String,List<DataHarmCleanJsonDTO>> map;
@Data
public static class DataHarmCleanJsonDTO {
@ApiModelProperty("指标名称")
private String targetName;
private List<DataHarmCleanNormalJsonDTO> list;
}
@Data
public static class DataHarmCleanNormalJsonDTO {
@ApiModelProperty("时间集合")
private List<String> time;
@ApiModelProperty("数据集合")
private List<Double> value;
@ApiModelProperty("相别")
private String phasic;
@ApiModelProperty("数据类型")
private String valueType;
}
}

View File

@@ -48,7 +48,8 @@ public enum DataCleanEnum {
Fluc("fluc","电压波动"),
//DataHarmPhasicV
V("v","次谐波电压基波相角"),
V("v","次谐波电压相角"),
V1("v_1","谐波电压基波相角"),
//DataHarmRateV
V_Rate("v","次谐波电压含有率"),
@@ -66,8 +67,8 @@ public enum DataCleanEnum {
Plt("plt","长时闪变"),
//DataV
FreqDev("freq_dev","频率偏差"),
Freq("freq","频率"),
FreqDev("freq_dev","频率偏差"),
RmsV("rms","相电压有效值"),
VPos("v_pos","正序电压"),
VNeg("v_neg","负序电压"),

View File

@@ -0,0 +1,175 @@
package com.njcn.dataProcess.pojo.po;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.github.jeffreyning.mybatisplus.anno.MppMultiId;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.time.LocalDate;
/**
* <p>
*
* </p>
*
* @author xy
* @since 2025-02-17
*/
@Getter
@Setter
@TableName("pq_data_verify_bak")
public class PqDataVerifyBak implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 监测点id
*/
@MppMultiId
@TableField(value = "line_id")
private String lineId;
/**
* 异常数据时间
*/
@MppMultiId
@TableField(value = "time_id")
@JsonFormat(pattern = "yyyy-MM-dd",timezone="GMT+8")
private LocalDate time;
/**
* 频率指标
*/
@TableField(value = "freq")
private Integer freq = 0;
/**
* 频率偏差指标
*/
@TableField(value = "freq_dev")
private Integer freqDev = 0;
/**
* 相电压有效值
*/
@TableField(value = "v_rms")
private Integer vRms = 0;
/**
* 正序电压
*/
@TableField(value = "v_pos")
private Integer vPos = 0;
/**
* 负序电压
*/
@TableField(value = "v_neg")
private Integer vNeg = 0;
/**
* 零序电压
*/
@TableField(value = "v_zero")
private Integer vZero = 0;
/**
* 电压不平衡度
*/
@TableField(value = "v_unbalance")
private Integer vUnbalance = 0;
/**
* 线电压有效值
*/
@TableField(value = "rms_lvr")
private Integer rmsLvr = 0;
/**
* 电压正偏差
*/
@TableField(value = "vu_dev")
private Integer vuDev = 0;
/**
* 电压负偏差
*/
@TableField(value = "vl_Dev")
private Integer vlDev = 0;
/**
* 电压总谐波畸变率
*/
@TableField(value = "v_thd")
private Integer vThd = 0;
/**
* 相电压基波有效值
*/
@TableField(value = "v")
private Integer v = 0;
/**
* 电流有效值
*/
@TableField(value = "i_rms")
private Integer iRms = 0;
/**
* 长时闪变
*/
@TableField(value = "plt")
private Integer plt = 0;
/**
* 间谐波电压含有率
*/
@TableField(value = "v_inharm")
private Integer vInharm = 0;
/**
* 谐波电压含有率
*/
@TableField(value = "v_harm")
private Integer vHarm = 0;
/**
* 功率因数
*/
@TableField(value = "pf")
private Integer pf = 0;
/**
* 谐波电压相角
*/
@TableField(value = "v_phasic")
private Integer vPhasic = 0;
/**
* 谐波电压基波相角
*/
@TableField(value = "v1_phasic")
private Integer v1Phasic = 0;
/**
* 电压波动
*/
@TableField(value = "fluc")
private Integer fluc = 0;
/**
* 短时闪变
*/
@TableField(value = "pst")
private Integer pst = 0;
/**
* 文件路径
*/
@TableField(value = "path")
private String filePath;
}

View File

@@ -0,0 +1,56 @@
package com.njcn.dataProcess.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.constant.OperateType;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.dataProcess.pojo.po.PqDataVerifyBak;
import com.njcn.dataProcess.service.IPqDataVerifyNewService;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
/**
* <p>
* 前端控制器
* </p>
*
* @author xy
* @since 2025-02-17
*/
@RestController
@RequestMapping("/pqDataVerifyNew")
public class PqDataVerifyNewController extends BaseController {
@Resource
private IPqDataVerifyNewService pqDataVerifyNewService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD)
@PostMapping("/insertDataBatch")
@ApiOperation("存储清洗的异常数据")
public HttpResult<List<String>> insertDataBatch(@RequestBody List<PqDataVerifyBak> list) {
String methodDescribe = getMethodDescribe("insertDataBatch");
pqDataVerifyNewService.insertDataBatch(list);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD)
@PostMapping("/insertData")
@ApiOperation("存储清洗的异常数据")
public HttpResult<List<String>> insertData(@RequestBody PqDataVerifyBak pqDataVerifyBak) {
String methodDescribe = getMethodDescribe("insertData");
pqDataVerifyNewService.insertData(pqDataVerifyBak);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,16 @@
package com.njcn.dataProcess.dao.relation.mapper;
import com.github.jeffreyning.mybatisplus.base.MppBaseMapper;
import com.njcn.dataProcess.pojo.po.PqDataVerifyBak;
/**
* <p>
* Mapper 接口
* </p>
*
* @author xy
* @since 2025-02-17
*/
public interface PqDataVerifyNewMapper extends MppBaseMapper<PqDataVerifyBak> {
}

View File

@@ -0,0 +1,28 @@
package com.njcn.dataProcess.service;
import com.github.jeffreyning.mybatisplus.service.IMppService;
import com.njcn.dataProcess.pojo.po.PqDataVerifyBak;
import java.util.List;
/**
* <p>
* 服务类
* </p>
*
* @author xy
* @since 2025-02-17
*/
public interface IPqDataVerifyNewService extends IMppService<PqDataVerifyBak> {
/**
* 异常数据插入
* @param list
*/
void insertDataBatch(List<PqDataVerifyBak> list);
/**
* 异常数据插入
*/
void insertData(PqDataVerifyBak pqDataVerifyBak);
}

View File

@@ -0,0 +1,31 @@
package com.njcn.dataProcess.service.impl.relation;
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
import com.njcn.dataProcess.dao.relation.mapper.PqDataVerifyNewMapper;
import com.njcn.dataProcess.pojo.po.PqDataVerifyBak;
import com.njcn.dataProcess.service.IPqDataVerifyNewService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* <p>
* 服务实现类
* </p>
*
* @author xy
* @since 2025-02-17
*/
@Service
public class PqDataVerifyNewServiceImpl extends MppServiceImpl<PqDataVerifyNewMapper, PqDataVerifyBak> implements IPqDataVerifyNewService {
@Override
public void insertDataBatch(List<PqDataVerifyBak> list) {
this.saveOrUpdateBatchByMultiId(list,1000);
}
@Override
public void insertData(PqDataVerifyBak pqDataVerifyBak) {
this.saveOrUpdateByMultiId(pqDataVerifyBak);
}
}