新增数据质量清洗算法

This commit is contained in:
xy
2025-05-22 16:18:24 +08:00
parent 228b694322
commit 412a67f6fd
77 changed files with 1217 additions and 385 deletions

View File

@@ -38,6 +38,19 @@ public class MeasurementExecutor extends BaseExecutor {
@Resource
private IPollutionService pollutionService;
/**
* 数据质量清洗
*/
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataQualityClean", nodeType = NodeTypeEnum.COMMON)
public boolean dataQualityCleanAccess(NodeComponent bindCmp) {
return isAccess(bindCmp);
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataQualityClean", nodeType = NodeTypeEnum.COMMON)
public void dataQualityCleanProcess(NodeComponent bindCmp) {
dataCleanService.dataQualityCleanHandler(bindCmp.getRequestData());
}
/**
* 数据清洗 电压表
* dataV表

View File

@@ -7,6 +7,14 @@ import com.njcn.algorithm.pojo.bo.CalculatedParam;
*/
public interface IDataCleanService {
/**
* 数据质量清洗
* 根据暂态事件发生的事件,将数据添加标签。
* 获取暂态事件的时间,加上持续时间和统计间隔时间,作为剔除的起始和结束时间
* @param calculatedParam
*/
void dataQualityCleanHandler(CalculatedParam calculatedParam);
/***
* dataV数据清洗
* 不标记原始数据,将异常数据查询出来重新存储至详情表中

View File

@@ -6,6 +6,7 @@ import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.IDataCleanService;
import com.njcn.algorithm.utils.MemorySizeUtil;
import com.njcn.dataProcess.api.*;
import com.njcn.dataProcess.dto.*;
import com.njcn.dataProcess.enums.DataCleanEnum;
import com.njcn.dataProcess.param.DataCleanParam;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
@@ -14,18 +15,25 @@ import com.njcn.dataProcess.pojo.po.PqDataVerify;
import com.njcn.dataProcess.util.DataCommonUtils;
import com.njcn.dataProcess.util.TimeUtils;
import com.njcn.device.pq.api.LineFeignClient;
import com.njcn.device.pq.pojo.vo.LineDetailDataVO;
import com.njcn.device.pq.pojo.vo.LineDetailVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -40,6 +48,8 @@ public class DataCleanServiceImpl implements IDataCleanService {
private static final Logger logger = LoggerFactory.getLogger(DataCleanServiceImpl.class);
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
@Value("${line.num}")
private Integer NUM = 100;
@@ -52,10 +62,18 @@ public class DataCleanServiceImpl implements IDataCleanService {
@Resource
private DataInharmVFeignClient dataInharmVFeignClient;
@Resource
private DataInharmIFeignClient dataInharmIFeignClient;
@Resource
private DataHarmRateVFeignClient dataHarmRateVFeignClient;
@Resource
private DataHarmRateIFeignClient dataHarmRateIFeignClient;
@Resource
private DataHarmpowerPFeignClient dataHarmpowerPFeignClient;
@Resource
private DataHarmpowerQFeignClient dataHarmpowerQFeignClient;
@Resource
private DataHarmpowerSFeignClient dataHarmpowerSFeignClient;
@Resource
private DataHarmphasicVFeignClient dataHarmphasicVFeignClient;
@Resource
private DataFlucFeignClient dataFlucFeignClient;
@@ -67,8 +85,251 @@ public class DataCleanServiceImpl implements IDataCleanService {
private PqReasonableRangeFeignClient pqReasonableRangeFeignClient;
@Resource
private LineFeignClient lineFeignClient;
@Resource
private RmpEventDetailFeignClient rmpEventDetailFeignClient;
@Autowired
private DataHarmphasicIFeignClient dataHarmphasicIFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void dataQualityCleanHandler(CalculatedParam calculatedParam) {
MemorySizeUtil.getNowMemory();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
logger.info("{},数据质量清洗算法执行=====》", LocalDateTime.now());
//获取监测点的统计间隔
List<String> listOfString = (List<String>) (List<?>) calculatedParam.getIdList();
List<LineDetailDataVO> lineDetailDataVOS = lineFeignClient.getLineDetailList(listOfString).getData();
if (CollUtil.isEmpty(lineDetailDataVOS)) {
logger.error("监测点集合为空,无法计算!");
return;
}
Map<String,LineDetailDataVO> lineMap = lineDetailDataVOS.stream().collect(Collectors.toMap(LineDetailDataVO::getLineId, Function.identity()));
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setLineId(calculatedParam.getIdList());
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
//获取监测点的暂态事件
List<RmpEventDetailDTO> eventList = rmpEventDetailFeignClient.getRawData(lineParam).getData();
if (CollUtil.isNotEmpty(eventList)) {
eventList.forEach(item->{
LineDetailDataVO vo = lineMap.get(item.getMeasurementPointId());
//暂态事件时间
LocalDateTime localDateTime = item.getStartTime();
//暂态事件持续时间(秒转毫秒)
long lastTime = (long) (item.getDuration() * 1000);
//统计间隔
Integer timeInterval = vo.getTimeInterval();
//数据开始时间
String startTime = localDateTime.format(formatter);
//数据结束时间
long time = localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + lastTime + (timeInterval * 60 * 1000);
Instant instant = Instant.ofEpochMilli(time);
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String endTime = dateTime.format(formatter);
LineCountEvaluateParam param = new LineCountEvaluateParam();
param.setLineId(Collections.singletonList(item.getMeasurementPointId()));
param.setStartTime(startTime);
param.setEndTime(endTime);
param.setDataType(false);
log.info("监测点:{},数据剔除初始时间:{},结束时间:{}", item.getMeasurementPointId(),startTime,endTime);
//修改dataV数据
List<DataVDto> dataV = dataVFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataV)) {
dataV.forEach(item1->{
item1.setAbnormalFlag(1);
});
dataVFeignClient.addInfluxDbList(dataV);
log.info("修改dataV数据量:{}", dataV.size());
}
//修改dataI数据
List<DataIDto> dataI = dataIFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataI)) {
dataI.forEach(item1->{
item1.setAbnormalFlag(1);
});
dataIFeignClient.addInfluxDbList(dataI);
log.info("修改dataI数据量:{}", dataI.size());
}
//修改dataFlicker数据
List<DataFlickerDto> dataFlicker = dataFlickerFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataFlicker)) {
List<DataFlickerDTO> dataFlickerDTOList = new ArrayList<>();
dataFlicker.forEach(item1->{
DataFlickerDTO dto = new DataFlickerDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataFlickerDTOList.add(dto);
});
dataFlickerFeignClient.batchInsertion(dataFlickerDTOList);
log.info("修改dataFlicker数据量:{}", dataFlicker.size());
}
//修改dataFluc数据
List<DataFlucDto> dataFluc = dataFlucFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataFluc)) {
List<DataFlucDTO> dataFlucDTOList = new ArrayList<>();
dataFluc.forEach(item1->{
DataFlucDTO dto = new DataFlucDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataFlucDTOList.add(dto);
});
dataFlucFeignClient.batchInsertion(dataFlucDTOList);
log.info("修改dataFluc数据量:{}", dataFluc.size());
}
//修改dataHarmPhasicI数据
List<DataHarmPhasicIDto> dataHarmPhasicI = dataHarmphasicIFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataHarmPhasicI)) {
List<DataHarmphasicIDTO> dataHarmphasicIDTOList = new ArrayList<>();
dataHarmPhasicI.forEach(item1->{
DataHarmphasicIDTO dto = new DataHarmphasicIDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataHarmphasicIDTOList.add(dto);
});
dataHarmphasicIFeignClient.batchInsertion(dataHarmphasicIDTOList);
log.info("修改dataHarmPhasicI数据量:{}", dataHarmPhasicI.size());
}
//修改dataHarmPhasicV数据
List<DataHarmDto> DataHarmV = dataHarmphasicVFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(DataHarmV)) {
List<DataHarmphasicVDTO> dataHarmphasicVDTOList = new ArrayList<>();
DataHarmV.forEach(item1->{
DataHarmphasicVDTO dto = new DataHarmphasicVDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataHarmphasicVDTOList.add(dto);
});
dataHarmphasicVFeignClient.batchInsertion(dataHarmphasicVDTOList);
log.info("修改dataHarmPhasicV数据量:{}", DataHarmV.size());
}
//修改dataHarmPowerP数据
List<DataPowerPDto> dataPowerP = dataHarmpowerPFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataPowerP)) {
List<DataHarmpowerPDTO> dataHarmpowerPDTOList = new ArrayList<>();
dataPowerP.forEach(item1->{
DataHarmpowerPDTO dto = new DataHarmpowerPDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataHarmpowerPDTOList.add(dto);
});
dataHarmpowerPFeignClient.batchInsertion(dataHarmpowerPDTOList);
log.info("修改 dataHarmPowerP数据量:{}", dataPowerP.size());
}
//修改dataHarmPowerQ数据
List<DataHarmPowerQDto> dataHarmPowerQ = dataHarmpowerQFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataHarmPowerQ)) {
List<DataHarmpowerQDTO> dataHarmpowerQDTOList = new ArrayList<>();
dataHarmPowerQ.forEach(item1->{
DataHarmpowerQDTO dto = new DataHarmpowerQDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataHarmpowerQDTOList.add(dto);
});
dataHarmpowerQFeignClient.batchInsertion(dataHarmpowerQDTOList);
log.info("修改dataHarmPowerQ数据量:{}", dataHarmPowerQ.size());
}
//修改dataHarmPowerS数据
List<DataHarmPowerSDto> dataHarmPowerS = dataHarmpowerSFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataHarmPowerS)) {
List<DataHarmpowerSDTO> dataHarmpowerSDTOList = new ArrayList<>();
dataHarmPowerS.forEach(item1->{
DataHarmpowerSDTO dto = new DataHarmpowerSDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataHarmpowerSDTOList.add(dto);
});
dataHarmpowerSFeignClient.batchInsertion(dataHarmpowerSDTOList);
log.info("修改dataHarmPowerS数据量:{}", dataHarmPowerS.size());
}
//修改dataHarmRateI数据
List<DataHarmRateIDto> dataHarmRateI = dataHarmRateIFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataHarmRateI)) {
dataHarmRateI.forEach(item1->{
item1.setAbnormalFlag(1);
});
dataHarmRateIFeignClient.addInfluxDbList(dataHarmRateI);
log.info("修改dataHarmRateI数据量:{}", dataHarmRateI.size());
}
//修改dataHarmRateV数据
List<DataHarmDto> DataHarmRateV = dataHarmRateVFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(DataHarmRateV)) {
List<DataHarmrateVDTO> dataHarmrateVDTOList = new ArrayList<>();
DataHarmRateV.forEach(item1->{
DataHarmrateVDTO dto = new DataHarmrateVDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataHarmrateVDTOList.add(dto);
});
dataHarmRateVFeignClient.batchInsertion(dataHarmrateVDTOList);
log.info("修改dataHarmRateV数据量:{}", DataHarmRateV.size());
}
//修改dataInHarmI数据
List<DataHarmRateIDto> dataInHarmI = dataHarmRateIFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataInHarmI)) {
dataInHarmI.forEach(item1->{
item1.setAbnormalFlag(1);
});
dataHarmRateIFeignClient.addInfluxDbList(dataInHarmI);
log.info("修改dataHarmRateI数据量:{}", dataInHarmI.size());
}
//修改dataInHarmV数据
List<DataHarmDto> dataInHarmV = dataInharmVFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataInHarmV)) {
List<DataInharmVDTO> dataInharmVDTOList = new ArrayList<>();
dataInHarmV.forEach(item1->{
DataInharmVDTO dto = new DataInharmVDTO();
BeanUtils.copyProperties(item1,dto);
dto.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
dto.setAbnormalFlag(1);
dataInharmVDTOList.add(dto);
});
dataInharmVFeignClient.batchInsertion(dataInharmVDTOList);
log.info("修改dataInHarmV数据量:{}", dataInHarmV.size());
}
//修改dataPlt数据
List<DataPltDto> dataPlt = dataPltFeignClient.getRawData(param).getData();
if (CollUtil.isNotEmpty(dataPlt)) {
List<DataPltDTO> dataPltDTOList = new ArrayList<>();
dataPlt.forEach(item1->{
DataPltDTO pltDTO = new DataPltDTO();
BeanUtils.copyProperties(item1,pltDTO);
pltDTO.setTimeid(LocalDateTime.parse(item1.getMinTime(), DATE_TIME_FORMATTER));
pltDTO.setAbnormalFlag(1);
dataPltDTOList.add(pltDTO);
});
dataPltFeignClient.batchInsertion(dataPltDTOList);
log.info("修改dataPlt数据量:{}", dataPlt.size());
}
});
}
}
@Override
public void dataVCleanHandler(CalculatedParam calculatedParam) {
MemorySizeUtil.getNowMemory();

View File

@@ -1,9 +1,7 @@
package com.njcn.algorithm.utils;
import org.apache.commons.lang.SerializationUtils;
import org.springframework.stereotype.Component;
import java.lang.instrument.Instrumentation;
import java.time.LocalDateTime;
import java.util.List;
@@ -39,3 +37,4 @@ public class MemorySizeUtil {
}
}