补召算法 天数逻辑处理

This commit is contained in:
xy
2025-12-15 17:11:34 +08:00
parent 074c020b10
commit d8d516f63d
3 changed files with 164 additions and 164 deletions

View File

@@ -151,7 +151,7 @@ public class ExecutionCenter extends BaseController {
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true); long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链 //递增日期执行算法链
for (int i = 0; i < betweenDay; i++) { for (int i = 0; i <= betweenDay; i++) {
if (i != 0) { if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1); startDate = DateUtil.offsetDay(startDate, 1);
} }
@@ -185,7 +185,7 @@ public class ExecutionCenter extends BaseController {
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true); long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链 //递增日期执行算法链
for (int i = 0; i < betweenDay; i++) { for (int i = 0; i <= betweenDay; i++) {
if (i != 0) { if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1); startDate = DateUtil.offsetDay(startDate, 1);
} }
@@ -220,7 +220,7 @@ public class ExecutionCenter extends BaseController {
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true); long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链 //递增日期执行算法链
for (int i = 0; i < betweenDay; i++) { for (int i = 0; i <= betweenDay; i++) {
if (i != 0) { if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1); startDate = DateUtil.offsetDay(startDate, 1);
} }
@@ -263,7 +263,7 @@ public class ExecutionCenter extends BaseController {
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATETIME_FORMAT); DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATETIME_FORMAT);
long betweenHour = DateUtil.between(startDate, endDate, DateUnit.HOUR); long betweenHour = DateUtil.between(startDate, endDate, DateUnit.HOUR);
//递增日期执行算法链 //递增日期执行算法链
for (int i = 0; i < betweenHour; i++) { for (int i = 0; i <= betweenHour; i++) {
if (i != 0) { if (i != 0) {
startDate = DateUtil.offsetHour(startDate, 1); startDate = DateUtil.offsetHour(startDate, 1);
} }
@@ -305,7 +305,7 @@ public class ExecutionCenter extends BaseController {
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true); long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链 //递增日期执行算法链
for (int i = 0; i < betweenDay; i++) { for (int i = 0; i <= betweenDay; i++) {
if (i != 0) { if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1); startDate = DateUtil.offsetDay(startDate, 1);
} }
@@ -344,7 +344,7 @@ public class ExecutionCenter extends BaseController {
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true); long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链 //递增日期执行算法链
for (int i = 0; i < betweenDay; i++) { for (int i = 0; i <= betweenDay; i++) {
if (i != 0) { if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1); startDate = DateUtil.offsetDay(startDate, 1);
} }
@@ -432,7 +432,7 @@ public class ExecutionCenter extends BaseController {
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true); long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链 //递增日期执行算法链
for (int i = 0; i < betweenDay; i++) { for (int i = 0; i <= betweenDay; i++) {
if (i != 0) { if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1); startDate = DateUtil.offsetDay(startDate, 1);
} }

View File

@@ -231,7 +231,7 @@ public class MeasurementExecutor extends BaseExecutor {
dayDataService.dataVHandler(bindCmp.getRequestData()); dayDataService.dataVHandler(bindCmp.getRequestData());
} }
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataI", nodeType = NodeTypeEnum.COMMON) @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataI", nodeType = NodeTypeEnum.COMMON)
public boolean dataIToDayAccess(NodeComponent bindCmp) { public boolean dataIToDayAccess(NodeComponent bindCmp) {
return isAccess(bindCmp); return isAccess(bindCmp);
} }

View File

@@ -58,7 +58,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
@Value("${line.num}") @Value("${line.num}")
private Integer NUM = 100; private Integer NUM;
@Resource @Resource
private DataVFeignClient dataVFeignClient; private DataVFeignClient dataVFeignClient;
@Resource @Resource
@@ -105,162 +105,162 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
System.out.println("已使用的内存: " + usedMemory / (1024 * 1024) + " MB"); System.out.println("已使用的内存: " + usedMemory / (1024 * 1024) + " MB");
System.out.println("第一次分析结束-----------------------------------------"); System.out.println("第一次分析结束-----------------------------------------");
logger.info("{},limitRate表转r_stat_limit_rate_d算法开始=====》", LocalDateTime.now()); logger.info("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{}=====》", calculatedParam.getDataDate());
List<DataLimitDetailDto> result = new ArrayList<>(); // List<DataLimitDetailDto> result = new ArrayList<>();
//远程接口获取分钟数据 // //远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); // LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); // lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); // lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
lineParam.setType(calculatedParam.getType()); // lineParam.setType(calculatedParam.getType());
List<String> lineIds = calculatedParam.getIdList(); // List<String> lineIds = calculatedParam.getIdList();
//获取所有监测点的限值 // //获取所有监测点的限值
List<Overlimit> overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData(); // List<Overlimit> overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData();
Map<String, Overlimit> overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity())); // Map<String, Overlimit> overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity()));
//添加异常数据时间点 // //添加异常数据时间点
getAbnormalData(lineParam); // getAbnormalData(lineParam);
//以100个监测点分片处理 // //以100个监测点分片处理
List<List<String>> pendingIds = ListUtils.partition(lineIds, NUM); // List<List<String>> pendingIds = ListUtils.partition(lineIds, NUM);
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); // ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
MemorySizeUtil.getNowMemory(); // MemorySizeUtil.getNowMemory();
pendingIds.forEach(list -> { // pendingIds.forEach(list -> {
lineParam.setLineId(list); // lineParam.setLineId(list);
//获取电压数据 // //获取电压数据
List<DataVDto> dataVAllTime = dataVFeignClient.getRawData(lineParam).getData(); // List<DataVDto> dataVAllTime = dataVFeignClient.getRawData(lineParam).getData();
//闪变数据 // //闪变数据
List<DataPltDto> dataPltAllTime = dataPltFeignClient.getRawData(lineParam).getData(); // List<DataPltDto> dataPltAllTime = dataPltFeignClient.getRawData(lineParam).getData();
//谐波数据 // //谐波数据
List<DataHarmDto> dataVHarmList = dataHarmRateVFeignClient.getRawData(lineParam).getData(); // List<DataHarmDto> dataVHarmList = dataHarmRateVFeignClient.getRawData(lineParam).getData();
//间谐波数据 // //间谐波数据
List<DataHarmDto> dataVInHarmList = dataInharmVFeignClient.getRawData(lineParam).getData(); // List<DataHarmDto> dataVInHarmList = dataInharmVFeignClient.getRawData(lineParam).getData();
//电流数据 // //电流数据
List<DataIDto> dataIList = dataIFeignClient.getRawData(lineParam).getData(); // List<DataIDto> dataIList = dataIFeignClient.getRawData(lineParam).getData();
/** // /**
* 功能描述:获取influxDB -> data_v -> // * 功能描述:获取influxDB -> data_v ->
* 总计算次数(用data_v中phasic_type=A,value_type=avg,quality_flag=0来参与统计) // * 总计算次数(用data_v中phasic_type=A,value_type=avg,quality_flag=0来参与统计)
*/ // */
Map<String, List<DataVDto>> allTime = dataVAllTime.stream() // Map<String, List<DataVDto>> allTime = dataVAllTime.stream()
.filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType())) // .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType()))
.filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataVDto::getLineId));
/**
* 功能描述:获取influxDB -> data_plt ->
* 闪变总计算次数(用data_plt中phasic_type=A,value_type=avg,quality_flag=0来参与统计)
*/
//fixme 冀北现场 闪变原始表没有 value_type 这个参数
Map<String, List<DataPltDto>> pltAllTime = dataPltAllTime.stream()
.filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType()))
// .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType())) // .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataPltDto::getLineId)); // .collect(Collectors.groupingBy(DataVDto::getLineId));
//
/** // /**
*功能描述:获取influxDB -> data_harmrate_v -> // * 功能描述:获取influxDB -> data_plt ->
* 2-25次谐波电压含有率 -> A相||B相||C相的日95%概率值 // * 闪变总计算次数(用data_plt中phasic_type=A,value_type=avg,quality_flag=0来参与统计)
*/ // */
Map<String, List<DataHarmDto>> harmRateV = dataVHarmList.stream() // //fixme 冀北现场 闪变原始表没有 value_type 这个参数
.filter(x -> phase.contains(x.getPhasicType())) // Map<String, List<DataPltDto>> pltAllTime = dataPltAllTime.stream()
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) // .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType()))
.collect(Collectors.groupingBy(DataHarmDto::getLineId)); //// .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType()))
// .collect(Collectors.groupingBy(DataPltDto::getLineId));
/** //
* 功能描述:获取influxDB -> data_i -> 2-25次谐波电流 -> 日95%概率值 // /**
*/ // *功能描述:获取influxDB -> data_harmrate_v ->
Map<String, List<DataIDto>> dataI = dataIList.stream() // * 2-25次谐波电压含有率 -> A相||B相||C相的日95%概率值
.filter(x -> phase.contains(x.getPhasicType())) // */
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) // Map<String, List<DataHarmDto>> harmRateV = dataVHarmList.stream()
.collect(Collectors.groupingBy(DataIDto::getLineId)); // .filter(x -> phase.contains(x.getPhasicType()))
// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
// .collect(Collectors.groupingBy(DataHarmDto::getLineId));
/** //
* 功能描述:获取influxDB -> data_inharm_v -> 0.5-15.5次间谐波电压含有率 -> 日95%概率值 // /**
*/ // * 功能描述:获取influxDB -> data_i -> 2-25次谐波电流 -> 日95%概率值
Map<String, List<DataHarmDto>> inHarmV = dataVInHarmList.stream() // */
.filter(x -> phase.contains(x.getPhasicType())) // Map<String, List<DataIDto>> dataI = dataIList.stream()
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) // .filter(x -> phase.contains(x.getPhasicType()))
.collect(Collectors.groupingBy(DataHarmDto::getLineId)); // .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
// .collect(Collectors.groupingBy(DataIDto::getLineId));
/** //
* 功能描述:获取influxDB -> data_v -> 电压总谐波畸变率 -> 日95%概率值 //
*/ // /**
Map<String, List<DataVDto>> dataVThd = dataVAllTime.stream() // * 功能描述:获取influxDB -> data_inharm_v -> 0.5-15.5次间谐波电压含有率 -> 日95%概率值
.filter(x -> phase.contains(x.getPhasicType())) // */
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) // Map<String, List<DataHarmDto>> inHarmV = dataVInHarmList.stream()
.collect(Collectors.groupingBy(DataVDto::getLineId)); // .filter(x -> phase.contains(x.getPhasicType()))
// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
/** // .collect(Collectors.groupingBy(DataHarmDto::getLineId));
* 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值 //
*/ // /**
Map<String, List<DataVDto>> dataVUnbalance = dataVAllTime.stream() // * 功能描述:获取influxDB -> data_v -> 电压总谐波畸变率 -> 日95%概率值
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) // */
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || // Map<String, List<DataVDto>> dataVThd = dataVAllTime.stream()
InfluxDBTableConstant.MAX.equals(x.getValueType())) // .filter(x -> phase.contains(x.getPhasicType()))
.collect(Collectors.groupingBy(DataVDto::getLineId)); // .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
// .collect(Collectors.groupingBy(DataVDto::getLineId));
/** //
* 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值 // /**
*/ // * 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值
Map<String, List<DataIDto>> dataINeg = dataIList.stream() // */
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) // Map<String, List<DataVDto>> dataVUnbalance = dataVAllTime.stream()
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || // .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType()))
InfluxDBTableConstant.MAX.equals(x.getValueType())) // .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) ||
.collect(Collectors.groupingBy(DataIDto::getLineId)); // InfluxDBTableConstant.MAX.equals(x.getValueType()))
// .collect(Collectors.groupingBy(DataVDto::getLineId));
/** //
* 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值 // /**
*/ // * 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值
Map<String, List<DataVDto>> dataVFreq = dataVAllTime.stream() // */
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) // Map<String, List<DataIDto>> dataINeg = dataIList.stream()
.filter(x -> InfluxDBTableConstant.MIN.equals(x.getValueType()) || // .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType()))
InfluxDBTableConstant.MAX.equals(x.getValueType())) // .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) ||
.collect(Collectors.groupingBy(DataVDto::getLineId)); // InfluxDBTableConstant.MAX.equals(x.getValueType()))
/** // .collect(Collectors.groupingBy(DataIDto::getLineId));
* 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值 //
*/ // /**
Map<String, List<DataVDto>> dataVDev = dataVAllTime.stream() // * 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值
.filter(x -> phase.contains(x.getPhasicType())) // */
.filter(x -> InfluxDBTableConstant.MAX.equals(x.getValueType())) // Map<String, List<DataVDto>> dataVFreq = dataVAllTime.stream()
.collect(Collectors.groupingBy(DataVDto::getLineId)); // .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType()))
// .filter(x -> InfluxDBTableConstant.MIN.equals(x.getValueType()) ||
/** // InfluxDBTableConstant.MAX.equals(x.getValueType()))
* 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较) // .collect(Collectors.groupingBy(DataVDto::getLineId));
*/ // /**
Map<String, List<DataPltDto>> dataPlt = dataPltAllTime.stream() // * 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值
.filter(x -> phase.contains(x.getPhasicType())) // */
.collect(Collectors.groupingBy(DataPltDto::getLineId)); // Map<String, List<DataVDto>> dataVDev = dataVAllTime.stream()
// .filter(x -> phase.contains(x.getPhasicType()))
for (String item : list) { // .filter(x -> InfluxDBTableConstant.MAX.equals(x.getValueType()))
if(ObjectUtil.isNotNull(overLimitMap.get(item))){ // .collect(Collectors.groupingBy(DataVDto::getLineId));
result.addAll(getData(calculatedParam.getDataDate(), //
overLimitMap.get(item), // /**
allTime.get(item), // * 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较)
pltAllTime.get(item), // */
harmRateV.get(item), // Map<String, List<DataPltDto>> dataPlt = dataPltAllTime.stream()
dataI.get(item), // .filter(x -> phase.contains(x.getPhasicType()))
inHarmV.get(item), // .collect(Collectors.groupingBy(DataPltDto::getLineId));
dataVThd.get(item), //
dataVUnbalance.get(item), // for (String item : list) {
dataINeg.get(item), // if(ObjectUtil.isNotNull(overLimitMap.get(item))){
dataVFreq.get(item), // result.addAll(getData(calculatedParam.getDataDate(),
dataVDev.get(item), // overLimitMap.get(item),
dataPlt.get(item))); // allTime.get(item),
} // pltAllTime.get(item),
} // harmRateV.get(item),
}); // dataI.get(item),
MemorySizeUtil.getNowMemory(); // inHarmV.get(item),
if (CollUtil.isNotEmpty(result)) { // dataVThd.get(item),
//存储数据 // dataVUnbalance.get(item),
List<DataLimitRateDto> dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList()); // dataINeg.get(item),
if(CollUtil.isNotEmpty(dataLimitRate)){ // dataVFreq.get(item),
dataLimitRateFeignClient.batchInsertion(dataLimitRate); // dataVDev.get(item),
} // dataPlt.get(item)));
} // }
if (CollUtil.isNotEmpty(result)) { // }
//存储数据 // });
List<DataLimitRateDetailDto> detail = result.stream().map(DataLimitDetailDto::getDataLimitRateDetail).filter(x -> ObjectUtil.isNotNull(x)).collect(Collectors.toList()); // MemorySizeUtil.getNowMemory();
if(CollUtil.isNotEmpty(detail)){ // if (CollUtil.isNotEmpty(result)) {
dataLimitRateDetailFeignClient.batchInsertion(detail); // //存储数据
} // List<DataLimitRateDto> dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList());
} // if(CollUtil.isNotEmpty(dataLimitRate)){
// dataLimitRateFeignClient.batchInsertion(dataLimitRate);
// }
// }
// if (CollUtil.isNotEmpty(result)) {
// //存储数据
// List<DataLimitRateDetailDto> detail = result.stream().map(DataLimitDetailDto::getDataLimitRateDetail).filter(x -> ObjectUtil.isNotNull(x)).collect(Collectors.toList());
// if(CollUtil.isNotEmpty(detail)) {
// dataLimitRateDetailFeignClient.batchInsertion(detail);
// }
// }
System.gc(); System.gc();
} }