From d8d516f63d26a95a13930244190f2b8c41df86bf Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Mon, 15 Dec 2025 17:11:34 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=8F=AC=E7=AE=97=E6=B3=95=20?= =?UTF-8?q?=E5=A4=A9=E6=95=B0=E9=80=BB=E8=BE=91=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/njcn/algorithm/ExecutionCenter.java | 14 +- .../executor/MeasurementExecutor.java | 2 +- .../line/IDataCrossingServiceImpl.java | 312 +++++++++--------- 3 files changed, 164 insertions(+), 164 deletions(-) diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java index 91df75f..639fd5f 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java @@ -151,7 +151,7 @@ public class ExecutionCenter extends BaseController { DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); long betweenDay = DateUtil.betweenDay(startDate, endDate, true); //递增日期执行算法链 - for (int i = 0; i < betweenDay; i++) { + for (int i = 0; i <= betweenDay; i++) { if (i != 0) { startDate = DateUtil.offsetDay(startDate, 1); } @@ -185,7 +185,7 @@ public class ExecutionCenter extends BaseController { DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); long betweenDay = DateUtil.betweenDay(startDate, endDate, true); //递增日期执行算法链 - for (int i = 0; i < betweenDay; i++) { + for (int i = 0; i <= betweenDay; i++) { if (i != 0) { startDate = DateUtil.offsetDay(startDate, 1); } @@ -220,7 +220,7 @@ public class ExecutionCenter extends BaseController { DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); long betweenDay = DateUtil.betweenDay(startDate, endDate, true); //递增日期执行算法链 - for (int i = 0; i < betweenDay; i++) { + for (int i = 0; i <= betweenDay; i++) { if (i != 0) { startDate = DateUtil.offsetDay(startDate, 1); } @@ -263,7 +263,7 @@ public class ExecutionCenter extends BaseController { DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATETIME_FORMAT); long betweenHour = DateUtil.between(startDate, endDate, DateUnit.HOUR); //递增日期执行算法链 - for (int i = 0; i < betweenHour; i++) { + for (int i = 0; i <= betweenHour; i++) { if (i != 0) { startDate = DateUtil.offsetHour(startDate, 1); } @@ -305,7 +305,7 @@ public class ExecutionCenter extends BaseController { DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); long betweenDay = DateUtil.betweenDay(startDate, endDate, true); //递增日期执行算法链 - for (int i = 0; i < betweenDay; i++) { + for (int i = 0; i <= betweenDay; i++) { if (i != 0) { startDate = DateUtil.offsetDay(startDate, 1); } @@ -344,7 +344,7 @@ public class ExecutionCenter extends BaseController { DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); long betweenDay = DateUtil.betweenDay(startDate, endDate, true); //递增日期执行算法链 - for (int i = 0; i < betweenDay; i++) { + for (int i = 0; i <= betweenDay; i++) { if (i != 0) { startDate = DateUtil.offsetDay(startDate, 1); } @@ -432,7 +432,7 @@ public class ExecutionCenter extends BaseController { DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); long betweenDay = DateUtil.betweenDay(startDate, endDate, true); //递增日期执行算法链 - for (int i = 0; i < betweenDay; i++) { + for (int i = 0; i <= betweenDay; i++) { if (i != 0) { startDate = DateUtil.offsetDay(startDate, 1); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java index 4ba8acf..6937f3b 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java @@ -231,7 +231,7 @@ public class MeasurementExecutor extends BaseExecutor { dayDataService.dataVHandler(bindCmp.getRequestData()); } - @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataI", nodeType = NodeTypeEnum.COMMON) + @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataI", nodeType = NodeTypeEnum.COMMON) public boolean dataIToDayAccess(NodeComponent bindCmp) { return isAccess(bindCmp); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java index e8c5d3e..0fda7d9 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java @@ -58,7 +58,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class); @Value("${line.num}") - private Integer NUM = 100; + private Integer NUM; @Resource private DataVFeignClient dataVFeignClient; @Resource @@ -105,162 +105,162 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { System.out.println("已使用的内存: " + usedMemory / (1024 * 1024) + " MB"); System.out.println("第一次分析结束-----------------------------------------"); - logger.info("{},limitRate表转r_stat_limit_rate_d算法开始=====》", LocalDateTime.now()); - List result = new ArrayList<>(); - //远程接口获取分钟数据 - LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); - lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); - lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); - lineParam.setType(calculatedParam.getType()); - List lineIds = calculatedParam.getIdList(); - //获取所有监测点的限值 - List overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData(); - Map overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity())); - //添加异常数据时间点 - getAbnormalData(lineParam); - //以100个监测点分片处理 - List> pendingIds = ListUtils.partition(lineIds, NUM); - ArrayList phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); - MemorySizeUtil.getNowMemory(); - pendingIds.forEach(list -> { - lineParam.setLineId(list); - //获取电压数据 - List dataVAllTime = dataVFeignClient.getRawData(lineParam).getData(); - //闪变数据 - List dataPltAllTime = dataPltFeignClient.getRawData(lineParam).getData(); - //谐波数据 - List dataVHarmList = dataHarmRateVFeignClient.getRawData(lineParam).getData(); - //间谐波数据 - List dataVInHarmList = dataInharmVFeignClient.getRawData(lineParam).getData(); - //电流数据 - List dataIList = dataIFeignClient.getRawData(lineParam).getData(); - /** - * 功能描述:获取influxDB -> data_v -> - * 总计算次数(用data_v中phasic_type=A,value_type=avg,quality_flag=0来参与统计) - */ - Map> allTime = dataVAllTime.stream() - .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType())) - .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType())) - .collect(Collectors.groupingBy(DataVDto::getLineId)); - - /** - * 功能描述:获取influxDB -> data_plt -> - * 闪变总计算次数(用data_plt中phasic_type=A,value_type=avg,quality_flag=0来参与统计) - */ - //fixme 冀北现场 闪变原始表没有 value_type 这个参数 - Map> pltAllTime = dataPltAllTime.stream() - .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType())) + logger.info("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{}=====》", calculatedParam.getDataDate()); +// List result = new ArrayList<>(); +// //远程接口获取分钟数据 +// LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); +// lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); +// lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); +// lineParam.setType(calculatedParam.getType()); +// List lineIds = calculatedParam.getIdList(); +// //获取所有监测点的限值 +// List overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData(); +// Map overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity())); +// //添加异常数据时间点 +// getAbnormalData(lineParam); +// //以100个监测点分片处理 +// List> pendingIds = ListUtils.partition(lineIds, NUM); +// ArrayList phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); +// MemorySizeUtil.getNowMemory(); +// pendingIds.forEach(list -> { +// lineParam.setLineId(list); +// //获取电压数据 +// List dataVAllTime = dataVFeignClient.getRawData(lineParam).getData(); +// //闪变数据 +// List dataPltAllTime = dataPltFeignClient.getRawData(lineParam).getData(); +// //谐波数据 +// List dataVHarmList = dataHarmRateVFeignClient.getRawData(lineParam).getData(); +// //间谐波数据 +// List dataVInHarmList = dataInharmVFeignClient.getRawData(lineParam).getData(); +// //电流数据 +// List dataIList = dataIFeignClient.getRawData(lineParam).getData(); +// /** +// * 功能描述:获取influxDB -> data_v -> +// * 总计算次数(用data_v中phasic_type=A,value_type=avg,quality_flag=0来参与统计) +// */ +// Map> allTime = dataVAllTime.stream() +// .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType())) // .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType())) - .collect(Collectors.groupingBy(DataPltDto::getLineId)); - - /** - *功能描述:获取influxDB -> data_harmrate_v -> - * 2-25次谐波电压含有率 -> A相||B相||C相的日95%概率值 - */ - Map> harmRateV = dataVHarmList.stream() - .filter(x -> phase.contains(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataHarmDto::getLineId)); - - /** - * 功能描述:获取influxDB -> data_i -> 2-25次谐波电流 -> 日95%概率值 - */ - Map> dataI = dataIList.stream() - .filter(x -> phase.contains(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataIDto::getLineId)); - - - /** - * 功能描述:获取influxDB -> data_inharm_v -> 0.5-15.5次间谐波电压含有率 -> 日95%概率值 - */ - Map> inHarmV = dataVInHarmList.stream() - .filter(x -> phase.contains(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataHarmDto::getLineId)); - - /** - * 功能描述:获取influxDB -> data_v -> 电压总谐波畸变率 -> 日95%概率值 - */ - Map> dataVThd = dataVAllTime.stream() - .filter(x -> phase.contains(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataVDto::getLineId)); - - /** - * 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值 - */ - Map> dataVUnbalance = dataVAllTime.stream() - .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || - InfluxDBTableConstant.MAX.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataVDto::getLineId)); - - /** - * 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值 - */ - Map> dataINeg = dataIList.stream() - .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || - InfluxDBTableConstant.MAX.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataIDto::getLineId)); - - /** - * 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值 - */ - Map> dataVFreq = dataVAllTime.stream() - .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.MIN.equals(x.getValueType()) || - InfluxDBTableConstant.MAX.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataVDto::getLineId)); - /** - * 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值 - */ - Map> dataVDev = dataVAllTime.stream() - .filter(x -> phase.contains(x.getPhasicType())) - .filter(x -> InfluxDBTableConstant.MAX.equals(x.getValueType())) - .collect(Collectors.groupingBy(DataVDto::getLineId)); - - /** - * 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较) - */ - Map> dataPlt = dataPltAllTime.stream() - .filter(x -> phase.contains(x.getPhasicType())) - .collect(Collectors.groupingBy(DataPltDto::getLineId)); - - for (String item : list) { - if(ObjectUtil.isNotNull(overLimitMap.get(item))){ - result.addAll(getData(calculatedParam.getDataDate(), - overLimitMap.get(item), - allTime.get(item), - pltAllTime.get(item), - harmRateV.get(item), - dataI.get(item), - inHarmV.get(item), - dataVThd.get(item), - dataVUnbalance.get(item), - dataINeg.get(item), - dataVFreq.get(item), - dataVDev.get(item), - dataPlt.get(item))); - } - } - }); - MemorySizeUtil.getNowMemory(); - if (CollUtil.isNotEmpty(result)) { - //存储数据 - List dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList()); - if(CollUtil.isNotEmpty(dataLimitRate)){ - dataLimitRateFeignClient.batchInsertion(dataLimitRate); - } - } - if (CollUtil.isNotEmpty(result)) { - //存储数据 - List detail = result.stream().map(DataLimitDetailDto::getDataLimitRateDetail).filter(x -> ObjectUtil.isNotNull(x)).collect(Collectors.toList()); - if(CollUtil.isNotEmpty(detail)){ - dataLimitRateDetailFeignClient.batchInsertion(detail); - } - } +// .collect(Collectors.groupingBy(DataVDto::getLineId)); +// +// /** +// * 功能描述:获取influxDB -> data_plt -> +// * 闪变总计算次数(用data_plt中phasic_type=A,value_type=avg,quality_flag=0来参与统计) +// */ +// //fixme 冀北现场 闪变原始表没有 value_type 这个参数 +// Map> pltAllTime = dataPltAllTime.stream() +// .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType())) +//// .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType())) +// .collect(Collectors.groupingBy(DataPltDto::getLineId)); +// +// /** +// *功能描述:获取influxDB -> data_harmrate_v -> +// * 2-25次谐波电压含有率 -> A相||B相||C相的日95%概率值 +// */ +// Map> harmRateV = dataVHarmList.stream() +// .filter(x -> phase.contains(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataHarmDto::getLineId)); +// +// /** +// * 功能描述:获取influxDB -> data_i -> 2-25次谐波电流 -> 日95%概率值 +// */ +// Map> dataI = dataIList.stream() +// .filter(x -> phase.contains(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataIDto::getLineId)); +// +// +// /** +// * 功能描述:获取influxDB -> data_inharm_v -> 0.5-15.5次间谐波电压含有率 -> 日95%概率值 +// */ +// Map> inHarmV = dataVInHarmList.stream() +// .filter(x -> phase.contains(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataHarmDto::getLineId)); +// +// /** +// * 功能描述:获取influxDB -> data_v -> 电压总谐波畸变率 -> 日95%概率值 +// */ +// Map> dataVThd = dataVAllTime.stream() +// .filter(x -> phase.contains(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataVDto::getLineId)); +// +// /** +// * 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值 +// */ +// Map> dataVUnbalance = dataVAllTime.stream() +// .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || +// InfluxDBTableConstant.MAX.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataVDto::getLineId)); +// +// /** +// * 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值 +// */ +// Map> dataINeg = dataIList.stream() +// .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || +// InfluxDBTableConstant.MAX.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataIDto::getLineId)); +// +// /** +// * 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值 +// */ +// Map> dataVFreq = dataVAllTime.stream() +// .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.MIN.equals(x.getValueType()) || +// InfluxDBTableConstant.MAX.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataVDto::getLineId)); +// /** +// * 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值 +// */ +// Map> dataVDev = dataVAllTime.stream() +// .filter(x -> phase.contains(x.getPhasicType())) +// .filter(x -> InfluxDBTableConstant.MAX.equals(x.getValueType())) +// .collect(Collectors.groupingBy(DataVDto::getLineId)); +// +// /** +// * 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较) +// */ +// Map> dataPlt = dataPltAllTime.stream() +// .filter(x -> phase.contains(x.getPhasicType())) +// .collect(Collectors.groupingBy(DataPltDto::getLineId)); +// +// for (String item : list) { +// if(ObjectUtil.isNotNull(overLimitMap.get(item))){ +// result.addAll(getData(calculatedParam.getDataDate(), +// overLimitMap.get(item), +// allTime.get(item), +// pltAllTime.get(item), +// harmRateV.get(item), +// dataI.get(item), +// inHarmV.get(item), +// dataVThd.get(item), +// dataVUnbalance.get(item), +// dataINeg.get(item), +// dataVFreq.get(item), +// dataVDev.get(item), +// dataPlt.get(item))); +// } +// } +// }); +// MemorySizeUtil.getNowMemory(); +// if (CollUtil.isNotEmpty(result)) { +// //存储数据 +// List dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList()); +// if(CollUtil.isNotEmpty(dataLimitRate)){ +// dataLimitRateFeignClient.batchInsertion(dataLimitRate); +// } +// } +// if (CollUtil.isNotEmpty(result)) { +// //存储数据 +// List detail = result.stream().map(DataLimitDetailDto::getDataLimitRateDetail).filter(x -> ObjectUtil.isNotNull(x)).collect(Collectors.toList()); +// if(CollUtil.isNotEmpty(detail)) { +// dataLimitRateDetailFeignClient.batchInsertion(detail); +// } +// } System.gc(); }