From 89df5f30390de4fe0adcbceee810e0148a02e485 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Tue, 11 Nov 2025 10:56:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=89=A9=E8=81=94=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E7=9B=B8=E5=85=B3=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/njcn/algorithm/pojo/bo/BaseParam.java | 2 +- .../LiteFlowAlgorithmFeignClient.java | 4 + ...owAlgorithmFeignClientFallbackFactory.java | 6 + algorithm/algorithm-boot/pom.xml | 5 + .../com/njcn/algorithm/ExecutionCenter.java | 48 ++++++- .../executor/MeasurementExecutor.java | 2 +- .../serviceimpl/line/DayDataServiceImpl.java | 30 ++-- .../line/IDataCrossingServiceImpl.java | 30 ++-- .../param/LineCountEvaluateParam.java | 2 + .../dataProcess/po/influx/DataFlicker.java | 6 + .../njcn/dataProcess/po/influx/DataFluc.java | 6 + .../po/influx/DataHarmphasicI.java | 6 + .../po/influx/DataHarmphasicV.java | 6 + .../dataProcess/po/influx/DataHarmpowerP.java | 6 + .../dataProcess/po/influx/DataHarmpowerQ.java | 6 + .../dataProcess/po/influx/DataHarmpowerS.java | 6 + .../dataProcess/po/influx/DataHarmrateI.java | 6 + .../dataProcess/po/influx/DataHarmrateV.java | 6 + .../com/njcn/dataProcess/po/influx/DataI.java | 6 + .../dataProcess/po/influx/DataInharmI.java | 6 + .../dataProcess/po/influx/DataInharmV.java | 6 + .../njcn/dataProcess/po/influx/DataPlt.java | 9 ++ .../com/njcn/dataProcess/po/influx/DataV.java | 7 + data-processing/data-processing-boot/pom.xml | 5 + .../influxdb/InfluxdbDataFlickerImpl.java | 90 +++++++++++- .../impl/influxdb/InfluxdbDataFlucImpl.java | 80 ++++++++++- .../influxdb/InfluxdbDataHarmRateIImpl.java | 75 +++++++++- .../influxdb/InfluxdbDataHarmRateVImpl.java | 84 ++++++++++- .../influxdb/InfluxdbDataHarmphasicIImpl.java | 75 +++++++++- .../influxdb/InfluxdbDataHarmphasicVImpl.java | 85 ++++++++++- .../influxdb/InfluxdbDataHarmpowerPImpl.java | 83 ++++++++++- .../influxdb/InfluxdbDataHarmpowerQImpl.java | 76 +++++++++- .../influxdb/InfluxdbDataHarmpowerSImpl.java | 76 +++++++++- .../impl/influxdb/InfluxdbDataIImpl.java | 95 ++++++++++++- .../influxdb/InfluxdbDataInharmIImpl.java | 72 +++++++++- .../influxdb/InfluxdbDataInharmVImpl.java | 80 ++++++++++- .../impl/influxdb/InfluxdbDataPltImpl.java | 83 ++++++++++- .../impl/influxdb/InfluxdbDataVImpl.java | 133 ++++++++++++++++-- 38 files changed, 1332 insertions(+), 77 deletions(-) diff --git a/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/BaseParam.java b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/BaseParam.java index 23a3bf3..cd89d11 100644 --- a/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/BaseParam.java +++ b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/BaseParam.java @@ -58,6 +58,6 @@ public class BaseParam implements Serializable { @ApiModelProperty(name = "idList",value = "索引集合") private List idList; - @ApiModelProperty(name = "type",value = "0:通用 1:省级平台 ") + @ApiModelProperty(name = "type",value = "0:通用 1:省级平台 2:物联平台") private Integer type; } diff --git a/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/LiteFlowAlgorithmFeignClient.java b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/LiteFlowAlgorithmFeignClient.java index 7c337dd..aaaa3e2 100644 --- a/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/LiteFlowAlgorithmFeignClient.java +++ b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/LiteFlowAlgorithmFeignClient.java @@ -42,4 +42,8 @@ public interface LiteFlowAlgorithmFeignClient { @ApiOperation("新能源专项分析算法执行链") @PostMapping("/specialAnalysis") void specialAnalysisExecutor(@RequestBody BaseParam baseParam); + + @ApiOperation("物联监测点算法执行链") + @PostMapping("/wlMeasurementPointExecutor") + void wlMeasurementPointExecutor(@RequestBody BaseParam baseParam); } diff --git a/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/fallback/LiteFlowAlgorithmFeignClientFallbackFactory.java b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/fallback/LiteFlowAlgorithmFeignClientFallbackFactory.java index c7b64da..0732fc6 100644 --- a/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/fallback/LiteFlowAlgorithmFeignClientFallbackFactory.java +++ b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/liteflow/fallback/LiteFlowAlgorithmFeignClientFallbackFactory.java @@ -52,6 +52,12 @@ public class LiteFlowAlgorithmFeignClientFallbackFactory implements FallbackFact throw new BusinessException(finalExceptionEnum); } + @Override + public void wlMeasurementPointExecutor(BaseParam baseParam) { + log.error("{}异常,降级处理,异常为:{}", "物联监测点算法执行链: ", throwable.toString()); + throw new BusinessException(finalExceptionEnum); + } + @Override public void deviceExecutor(BaseParam baseParam) { log.error("{}异常,降级处理,异常为:{}", "装置算法执行链: ", throwable.toString()); diff --git a/algorithm/algorithm-boot/pom.xml b/algorithm/algorithm-boot/pom.xml index e8bada2..5876a43 100644 --- a/algorithm/algorithm-boot/pom.xml +++ b/algorithm/algorithm-boot/pom.xml @@ -104,6 +104,11 @@ pq-device-api ${project.version} + + com.njcn + cs-device-api + ${project.version} + com.njcn common-event diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java index adec388..a2ca81d 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java @@ -2,7 +2,10 @@ package com.njcn.algorithm; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.date.*; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; import cn.hutool.core.text.StrPool; import cn.hutool.core.util.StrUtil; import com.njcn.algorithm.pojo.bo.BaseParam; @@ -13,8 +16,12 @@ import com.njcn.algorithm.utils.MemorySizeUtil; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.device.biz.commApi.CommTerminalGeneralClient; -import com.njcn.device.biz.pojo.dto.*; +import com.njcn.device.biz.pojo.dto.DeptGetChildrenMoreDTO; +import com.njcn.device.biz.pojo.dto.DeptGetDeviceDTO; +import com.njcn.device.biz.pojo.dto.DeptGetSubStationDTO; +import com.njcn.device.biz.pojo.dto.LineDevGetDTO; import com.njcn.device.biz.pojo.param.DeptGetLineParam; import com.njcn.device.pq.api.DeptLineFeignClient; import com.njcn.user.api.DeptFeignClient; @@ -62,6 +69,8 @@ public class ExecutionCenter extends BaseController { private FlowExecutor flowExecutor; @Resource private DeptLineFeignClient deptLineFeignClient; + @Resource + private CsLineFeignClient csLineFeignClient; /*** * 1、校验非全链执行时,tagNames节点标签集合必须为非空,否则提示---无可执行节点 @@ -155,7 +164,40 @@ public class ExecutionCenter extends BaseController { liteflowResponse = flowExecutor.execute2Resp("measurement_point", calculatedParam); dealResponse(calculatedParam, liteflowResponse, methodDescribe); } - + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("物联监测点算法执行链") + @PostMapping("/wlMeasurementPointExecutor") + @Async("asyncExecutor") + public void wlMeasurementPointExecutor(@RequestBody BaseParam baseParam) { + String methodDescribe = getMethodDescribe("wlMeasurementPointExecutor"); + //手动判断参数是否合法, + CalculatedParam calculatedParam = judgeExecuteParam(baseParam); + // 测点索引 + if (CollectionUtils.isEmpty(calculatedParam.getIdList())) { + calculatedParam.setIdList(csLineFeignClient.getAllLine().getData()); + } + LiteflowResponse liteflowResponse; + if (baseParam.isRepair()) { + //补招时,起始日期、截止日期必填 + DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT); + DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT); + long betweenDay = DateUtil.betweenDay(startDate, endDate, true); + //递增日期执行算法链 + for (int i = 0; i < betweenDay; i++) { + if (i != 0) { + startDate = DateUtil.offsetDay(startDate, 1); + } + calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN)); + liteflowResponse = flowExecutor.execute2Resp("wl_measurement_point", calculatedParam); + dealResponse(calculatedParam, liteflowResponse, methodDescribe); + } + } else { + //非补招 + liteflowResponse = flowExecutor.execute2Resp("wl_measurement_point", calculatedParam); + dealResponse(calculatedParam, liteflowResponse, methodDescribe); + } } @OperateInfo(info = LogEnum.BUSINESS_COMMON) diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java index 6937f3b..4ba8acf 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java @@ -231,7 +231,7 @@ public class MeasurementExecutor extends BaseExecutor { dayDataService.dataVHandler(bindCmp.getRequestData()); } - @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataI", nodeType = NodeTypeEnum.COMMON) + @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataI", nodeType = NodeTypeEnum.COMMON) public boolean dataIToDayAccess(NodeComponent bindCmp) { return isAccess(bindCmp); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java index 301a7ab..a81292e 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java @@ -76,6 +76,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -107,7 +108,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -125,6 +125,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -155,7 +156,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -174,6 +174,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -204,7 +205,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -223,6 +223,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -253,7 +254,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -271,6 +271,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -301,7 +302,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -319,6 +319,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -349,7 +350,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -367,6 +367,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -397,7 +398,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -415,6 +415,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -445,7 +446,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -463,6 +463,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -493,7 +494,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -511,6 +511,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -541,7 +542,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -559,6 +559,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -589,7 +590,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -607,6 +607,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -637,7 +638,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -655,6 +655,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -685,7 +686,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -704,6 +704,7 @@ public class DayDataServiceImpl implements IDayDataService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); pendingIds.forEach(list->{ @@ -734,7 +735,6 @@ public class DayDataServiceImpl implements IDayDataService { }); }); } - partList = null; }); if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -756,10 +756,8 @@ public class DayDataServiceImpl implements IDayDataService { Collectors.mapping(item->TimeUtils.LocalDateTimeToString(item.getTime()), Collectors.toList()) )); lineParam.setAbnormalTime(timeMap); - pqDataVerifies.clear(); } - } //指标处理 diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java index 8e6983d..8b4507e 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java @@ -11,6 +11,7 @@ import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IDataCrossingService; import com.njcn.algorithm.utils.MemorySizeUtil; import com.njcn.common.utils.PubUtils; +import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.dataProcess.api.*; import com.njcn.dataProcess.constant.PhaseType; import com.njcn.dataProcess.enums.DataCleanEnum; @@ -38,7 +39,10 @@ import java.lang.reflect.Method; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -77,6 +81,8 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { private DataLimitTargetFeignClient dataLimitTargetFeignClient; @Resource private DataLimitQualifiedFeignClient dataLimitQualifiedFeignClient; + @Resource + private CsLineFeignClient csLineFeignClient; @Override @@ -105,9 +111,10 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + lineParam.setType(calculatedParam.getType()); List lineIds = calculatedParam.getIdList(); //获取所有监测点的限值 - List overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData(); + List overLimitList = csLineFeignClient.getOverLimitData(lineIds).getData(); Map overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity())); //添加异常数据时间点 getAbnormalData(lineParam); @@ -120,7 +127,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { //获取电压数据 List dataVAllTime = dataVFeignClient.getRawData(lineParam).getData(); //闪变数据 - List dataFlickerAllTime = dataPltFeignClient.getRawData(lineParam).getData(); + List dataPltAllTime = dataPltFeignClient.getRawData(lineParam).getData(); //谐波数据 List dataVHarmList = dataHarmRateVFeignClient.getRawData(lineParam).getData(); //间谐波数据 @@ -133,15 +140,16 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { */ Map> allTime = dataVAllTime.stream() .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType())) - .filter(x -> InfluxDbSqlConstant.AVG_WEB.equals(x.getValueType())) + .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType())) .collect(Collectors.groupingBy(DataVDto::getLineId)); /** * 功能描述:获取influxDB -> data_plt -> - * 闪变总计算次数(用data_plt中phasic_type=A,quality_flag=0来参与统计) + * 闪变总计算次数(用data_plt中phasic_type=A,value_type=avg,quality_flag=0来参与统计) */ - Map> flickerAllTime = dataFlickerAllTime.stream() + Map> pltAllTime = dataPltAllTime.stream() .filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType())) + .filter(x -> InfluxDbSqlConstant.AVG_WEB.equalsIgnoreCase(x.getValueType())) .collect(Collectors.groupingBy(DataPltDto::getLineId)); /** @@ -215,7 +223,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { /** * 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较) */ - Map> dataPlt = dataFlickerAllTime.stream() + Map> dataPlt = dataPltAllTime.stream() .filter(x -> phase.contains(x.getPhasicType())) .collect(Collectors.groupingBy(DataPltDto::getLineId)); @@ -224,7 +232,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { result.addAll(getData(calculatedParam.getDataDate(), overLimitMap.get(item), allTime.get(item), - flickerAllTime.get(item), + pltAllTime.get(item), harmRateV.get(item), dataI.get(item), inHarmV.get(item), @@ -240,11 +248,10 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { MemorySizeUtil.getNowMemory(); if (CollUtil.isNotEmpty(result)) { //存储数据 - List dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(x -> ObjectUtil.isNotNull(x)).collect(Collectors.toList()); + List dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList()); if(CollUtil.isNotEmpty(dataLimitRate)){ dataLimitRateFeignClient.batchInsertion(dataLimitRate); } - } if (CollUtil.isNotEmpty(result)) { //存储数据 @@ -252,11 +259,8 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { if(CollUtil.isNotEmpty(detail)){ dataLimitRateDetailFeignClient.batchInsertion(detail); } - } - System.gc(); - } @Override diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java index cc2b062..fc53744 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java @@ -62,5 +62,7 @@ public class LineCountEvaluateParam extends BaseParam implements Serializable { */ private Boolean dataType = true; + //0:通用 1:省级平台 2:物联 + private Integer type; } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFlicker.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFlicker.java index 259ea75..46dca20 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFlicker.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFlicker.java @@ -33,6 +33,12 @@ public class DataFlicker { @Column(name = "phasic_type",tag = true) private String phasicType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + @Column(name = "fluc") private Double fluc=0.00; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFluc.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFluc.java index 233276c..2d5d882 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFluc.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataFluc.java @@ -33,6 +33,12 @@ public class DataFluc { @Column(name = "phasic_type",tag = true) private String phasicType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + @Column(name = "fluc") private Double fluc=0.00; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicI.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicI.java index d6d2150..71c9883 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicI.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicI.java @@ -43,6 +43,12 @@ public class DataHarmphasicI { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicV.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicV.java index 798323a..b317af3 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicV.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmphasicV.java @@ -44,6 +44,12 @@ public class DataHarmphasicV { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerP.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerP.java index a4f2357..1dd15f5 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerP.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerP.java @@ -44,6 +44,12 @@ public class DataHarmpowerP { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerQ.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerQ.java index 7241d91..84cf03d 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerQ.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerQ.java @@ -44,6 +44,12 @@ public class DataHarmpowerQ { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerS.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerS.java index d7d01c6..bbac6f4 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerS.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmpowerS.java @@ -44,6 +44,12 @@ public class DataHarmpowerS { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateI.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateI.java index 2d3ceec..f599727 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateI.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateI.java @@ -43,6 +43,12 @@ public class DataHarmrateI { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateV.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateV.java index 8550ce8..4e28e45 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateV.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataHarmrateV.java @@ -43,6 +43,12 @@ public class DataHarmrateV { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataI.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataI.java index 7d90258..c8e4d7f 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataI.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataI.java @@ -44,6 +44,12 @@ public class DataI { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmI.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmI.java index d27a79b..06a4b36 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmI.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmI.java @@ -44,6 +44,12 @@ public class DataInharmI { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmV.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmV.java index ef03f1a..13fb55a 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmV.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataInharmV.java @@ -44,6 +44,12 @@ public class DataInharmV { @Column(name = "value_type",tag = true) private String valueType; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataPlt.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataPlt.java index 1006643..7de189e 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataPlt.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataPlt.java @@ -34,9 +34,18 @@ public class DataPlt { @Column(name = "phasic_type",tag = true) private String phasicType; + @Column(name = "value_type",tag = true) + private String valueType; + @Column(name = "quality_flag",tag = true) private String qualityFlag="0"; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java index 1891a87..650e230 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java @@ -45,6 +45,13 @@ public class DataV { @Column(name = "quality_flag", tag = true) private String qualityFlag="0"; + @Column(name = "cl_did", tag = true) + private String cldid; + + @Column(name = "process", tag = true) + private String process; + + //是否是异常指标数据,0否1是 @Column(name = "abnormal_flag") private Integer abnormalFlag; diff --git a/data-processing/data-processing-boot/pom.xml b/data-processing/data-processing-boot/pom.xml index f59461a..32826c0 100644 --- a/data-processing/data-processing-boot/pom.xml +++ b/data-processing/data-processing-boot/pom.xml @@ -73,6 +73,11 @@ + + com.njcn + cs-device-api + ${project.version} + com.njcn.platform stat-api diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java index e39b631..8239691 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlickerImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.ObjectUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataFlickerMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataFlickerRelationMapper; import com.njcn.dataProcess.dto.DataFlickerDTO; @@ -20,10 +24,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -41,6 +47,17 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; + @Override public void batchInsertion(List dataFlickerDTOList) { @@ -64,7 +81,12 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam); + } else { + list = getMinuteData(lineParam); + } list.forEach(item -> { DataFlickerDto dto = new DataFlickerDto(); BeanUtils.copyProperties(item, dto); @@ -77,7 +99,12 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List dataIList = getMinuteData(lineParam); + List dataIList; + if (Objects.equals(lineParam.getType(), 2)) { + dataIList = getWlMinuteData(lineParam); + } else { + dataIList = getMinuteData(lineParam); + } if (CollectionUtil.isNotEmpty(dataIList)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -237,4 +264,63 @@ public class InfluxdbDataFlickerImpl extends MppServiceImpl getWlMinuteData(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineParam.getLineId())) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineParam.getLineId()).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + + lineParam.getLineId().forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataFlicker.class); + influxQueryWrapper.eq(DataFlicker::getLineId, lineId) + .eq(DataFlicker::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataFlicker::getLineId) + .select(DataFlicker::getPhasicType) + .between(DataFlicker::getTime, lineParam.getStartTime(), lineParam.getEndTime()) + .eq(DataFlicker::getQualityFlag, "0"); + if (CollUtil.isNotEmpty(lineParam.getPhasicType())) { + influxQueryWrapper.regular(DataFlicker::getPhasicType, lineParam.getPhasicType()); + } + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataFlicker::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataFlicker::getCldid,Integer.toString(po.getLineNo())); + } + //判断接线方式 (0-星型 1-角型 2-V型) 星型是相电压 角型或者v型是线电压 + if (Objects.equals(po.getConType(),0)) { + //相电压电压变动幅度 + influxQueryWrapper.select("Pq_Fluct","fluc"); + //相电压长时闪变 + influxQueryWrapper.select("Pq_Plt","plt"); + //相电压短时闪变 + influxQueryWrapper.select("Pq_Pst","pst"); + } else { + //线电压电压变动幅度 + influxQueryWrapper.select("Pq_LFluct","fluc"); + //相电压长时闪变 + influxQueryWrapper.select("Pq_LPlt","plt"); + //相电压短时闪变 + influxQueryWrapper.select("Pq_LPst","pst"); + } + result.addAll(dataFlickerMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java index d76ce29..8ebfab5 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataFlucImpl.java @@ -2,6 +2,10 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataFlucMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataFlucRelationMapper; import com.njcn.dataProcess.dto.DataFlucDTO; @@ -18,10 +22,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -39,6 +45,17 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override public void batchInsertion(List dataFlucDTOList) { @@ -62,7 +79,12 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataFlucDto dto = new DataFlucDto(); BeanUtils.copyProperties(item,dto); @@ -75,7 +97,12 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List dataIList = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List dataIList; + if (Objects.equals(lineParam.getType(), 2)) { + dataIList = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + dataIList = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(dataIList)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -191,4 +218,53 @@ public class InfluxdbDataFlucImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { + //todo FLUCCF电压波动频度原先oracle表存储的是0,写死的,这边暂不取值 + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataFluc.class); + influxQueryWrapper.eq(DataFluc::getLineId, lineId) + .eq(DataFluc::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataFluc::getLineId) + .select(DataFluc::getPhasicType) + .between(DataFluc::getTime, startTime, endTime) + .eq(DataFluc::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataFluc::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataFluc::getCldid,Integer.toString(po.getLineNo())); + } + //判断接线方式 (0-星型 1-角型 2-V型) 星型是相电压 角型或者v型是线电压 + if (Objects.equals(po.getConType(),0)) { + //相电压电压变动幅度 + influxQueryWrapper.select("Pq_Fluct","fluc"); + } else { + //线电压电压变动幅度 + influxQueryWrapper.select("Pq_LFluct","fluc"); + } + result.addAll(dataFlucMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateIImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateIImpl.java index df52831..1b43b80 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateIImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateIImpl.java @@ -4,15 +4,17 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataHarmRateIMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmRateIRelationMapper; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.po.influx.DataHarmrateI; import com.njcn.dataProcess.po.influx.DataHarmrateV; -import com.njcn.dataProcess.po.influx.DataI; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataHarmRateIDto; -import com.njcn.dataProcess.pojo.dto.DataIDto; import com.njcn.dataProcess.pojo.po.RStatDataHarmRateID; import com.njcn.dataProcess.service.IDataHarmRateI; import com.njcn.dataProcess.util.TimeUtils; @@ -26,6 +28,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -37,11 +40,27 @@ public class InfluxdbDataHarmRateIImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override public List getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataHarmRateIDto dto = new DataHarmRateIDto(); BeanUtils.copyProperties(item,dto); @@ -54,7 +73,12 @@ public class InfluxdbDataHarmRateIImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -225,4 +249,47 @@ public class InfluxdbDataHarmRateIImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmrateI.class); + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmRI_", "i_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + influxQueryWrapper.eq(DataHarmrateI::getLineId, lineId) + .eq(DataHarmrateI::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataHarmrateI::getLineId) + .select(DataHarmrateI::getPhasicType) + .select(DataHarmrateI::getValueType) + //电流基波有效值 + .select("Pq_RmsFundI_","i_1") + .between(DataHarmrateI::getTime, startTime, endTime) + .eq(DataHarmrateI::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataHarmrateI::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataHarmrateI::getCldid,Integer.toString(po.getLineNo())); + } + result.addAll(dataHarmRateIMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java index 2d650cd..71768e3 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.constant.InfluxDBTableConstant; import com.njcn.dataProcess.dao.imapper.DataHarmRateVMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmRateVRelationMapper; @@ -14,7 +18,6 @@ import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataHarmDto; import com.njcn.dataProcess.pojo.dto.DataHarmRateVDto; -import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.pojo.po.RStatDataHarmRateVD; import com.njcn.dataProcess.service.IDataHarmRateV; import com.njcn.dataProcess.util.TimeUtils; @@ -25,10 +28,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -41,11 +46,27 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override public List getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam); + } else { + list = getMinuteData(lineParam);; + } list.forEach(item->{ DataHarmDto dto = new DataHarmDto(); BeanUtils.copyProperties(item,dto); @@ -77,7 +98,12 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam); + } else { + data = getMinuteData(lineParam);; + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -292,4 +318,56 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl getWlMinuteData(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineParam.getLineId())) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineParam.getLineId()).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineParam.getLineId().forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmrateV.class); + influxQueryWrapper.eq(DataHarmrateV::getLineId, lineId) + .eq(DataHarmrateV::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataHarmrateV::getLineId) + .select(DataHarmrateV::getPhasicType) + .select(DataHarmrateV::getValueType) + .between(DataHarmrateV::getTime, lineParam.getStartTime(), lineParam.getEndTime()) + .eq(DataHarmrateV::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataHarmrateV::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataHarmrateV::getCldid,Integer.toString(po.getLineNo())); + } + //判断接线方式 (0-星型 1-角型 2-V型) 星型是相电压 角型或者v型是线电压 + if (Objects.equals(po.getConType(),0)) { + //相电压基波有效值 + influxQueryWrapper.select("Pq_RmsFundU_","v_1"); + //2-50次 相电压谐波含有率 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmU_", "v_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + } else { + //线电压基波有效值 + influxQueryWrapper.select("Pq_RmsFundLU_","v_1"); + //2-50次 线电压谐波含有率 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmLU_", "v_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + } + result.addAll(dataHarmRateVMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicIImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicIImpl.java index 502e605..81ec48d 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicIImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicIImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataHarmphasicIMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmPhasicIRelationMapper; import com.njcn.dataProcess.dto.DataHarmphasicIDTO; @@ -21,10 +25,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -40,6 +46,17 @@ public class InfluxdbDataHarmphasicIImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override public void batchInsertion(List dataHarmphasicIDTOList) { @@ -62,7 +79,12 @@ public class InfluxdbDataHarmphasicIImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataHarmPhasicIDto dto = new DataHarmPhasicIDto(); BeanUtils.copyProperties(item,dto); @@ -75,7 +97,12 @@ public class InfluxdbDataHarmphasicIImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -245,4 +272,48 @@ public class InfluxdbDataHarmphasicIImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap,Boolean dataType) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmphasicI.class); + //谐波电流幅值相角 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmIAng_", "i_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + influxQueryWrapper.eq(DataHarmphasicI::getLineId, lineId) + .eq(DataHarmphasicI::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataHarmphasicI::getLineId) + .select(DataHarmphasicI::getPhasicType) + .select(DataHarmphasicI::getValueType) + //基波电流相角 + .select("Pq_FundIAng","i_1") + .between(DataHarmphasicI::getTime, startTime, endTime) + .eq(DataHarmphasicI::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataHarmphasicI::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataHarmphasicI::getCldid,Integer.toString(po.getLineNo())); + } + result.addAll(dataHarmphasicIMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java index 4b856a4..0d30979 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmphasicVImpl.java @@ -4,10 +4,15 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataHarmphasicVMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmPhasicVRelationMapper; import com.njcn.dataProcess.dto.DataHarmphasicVDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataHarmphasicI; import com.njcn.dataProcess.po.influx.DataHarmphasicV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataHarmDto; @@ -22,10 +27,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -42,6 +49,17 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override @@ -66,7 +84,12 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataHarmDto dto = new DataHarmDto(); BeanUtils.copyProperties(item,dto); @@ -79,7 +102,12 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -249,4 +277,57 @@ public class InfluxdbDataHarmphasicVImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmphasicV.class); + influxQueryWrapper.eq(DataHarmphasicV::getLineId, lineId) + .eq(DataHarmphasicV::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataHarmphasicV::getLineId) + .select(DataHarmphasicV::getPhasicType) + .select(DataHarmphasicV::getValueType) + .between(DataHarmphasicV::getTime, startTime, endTime) + .eq(DataHarmphasicV::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataHarmphasicV::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataHarmphasicV::getCldid,Integer.toString(po.getLineNo())); + } + //判断接线方式 (0-星型 1-角型 2-V型) 星型是相电压 角型或者v型是线电压 + if (Objects.equals(po.getConType(),0)) { + //相电压基波有效值相角 + influxQueryWrapper.select("Pq_FundUAng","v_1"); + //2-50次 相电压谐波相角 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmUAng_", "v_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + } else { + //线电压基波有效值相角 + influxQueryWrapper.select("Pq_FundLUAng","v_1"); + //2-50次 线电压谐波相角 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmLUAng_", "v_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + } + result.addAll(dataHarmphasicVMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerPImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerPImpl.java index dbeb7b9..9790d08 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerPImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerPImpl.java @@ -4,10 +4,16 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataHarmpowerPMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmPowerPRelationMapper; import com.njcn.dataProcess.dto.DataHarmpowerPDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataHarmphasicI; +import com.njcn.dataProcess.po.influx.DataHarmphasicV; import com.njcn.dataProcess.po.influx.DataHarmpowerP; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataHarmPowerPDto; @@ -22,10 +28,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -42,6 +50,17 @@ public class InfluxdbDataHarmpowerPImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override @@ -65,7 +84,12 @@ public class InfluxdbDataHarmpowerPImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(), lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataPowerPDto dto = new DataPowerPDto(); BeanUtils.copyProperties(item,dto); @@ -78,7 +102,12 @@ public class InfluxdbDataHarmpowerPImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(), lineParam.getDataType()); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -255,4 +284,54 @@ public class InfluxdbDataHarmpowerPImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmpowerP.class); + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmP_", "p_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + influxQueryWrapper.eq(DataHarmpowerP::getLineId, lineId) + .eq(DataHarmpowerP::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataHarmpowerP::getLineId) + .select(DataHarmpowerP::getPhasicType) + .select(DataHarmpowerP::getValueType) + //位移功率因数 + .select("Pq_DF","df") + //视在功率因素 + .select("Pq_PF","pf") + //总功功率 + .select("Pq_P","p") + //基波有功功率 + .select("Pq_FundP","p_1") + .between(DataHarmpowerP::getTime, startTime, endTime) + .eq(DataHarmpowerP::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataHarmpowerP::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataHarmpowerP::getCldid,Integer.toString(po.getLineNo())); + } + result.addAll(dataHarmpowerPMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerQImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerQImpl.java index 13b54f4..bbc0568 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerQImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerQImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataHarmpowerQMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmPowerQRelationMapper; import com.njcn.dataProcess.dto.DataHarmpowerQDTO; @@ -21,10 +25,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -40,6 +46,17 @@ public class InfluxdbDataHarmpowerQImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override @@ -63,7 +80,12 @@ public class InfluxdbDataHarmpowerQImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataHarmPowerQDto dto = new DataHarmPowerQDto(); BeanUtils.copyProperties(item,dto); @@ -76,7 +98,12 @@ public class InfluxdbDataHarmpowerQImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -250,4 +277,49 @@ public class InfluxdbDataHarmpowerQImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmpowerQ.class); + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmQ_", "q_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + influxQueryWrapper.eq(DataHarmpowerQ::getLineId, lineId) + .eq(DataHarmpowerQ::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataHarmpowerQ::getLineId) + .select(DataHarmpowerQ::getPhasicType) + .select(DataHarmpowerQ::getValueType) + //总功功率 + .select("Pq_Q","q") + //基波有功功率 + .select("Pq_FundQ","q_1") + .between(DataHarmpowerQ::getTime, startTime, endTime) + .eq(DataHarmpowerQ::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataHarmpowerQ::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataHarmpowerQ::getCldid,Integer.toString(po.getLineNo())); + } + result.addAll(dataHarmpowerQMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerSImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerSImpl.java index 8664432..74ac25a 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerSImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmpowerSImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataHarmpowerSMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmPowerSRelationMapper; import com.njcn.dataProcess.dto.DataHarmpowerSDTO; @@ -21,10 +25,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -40,6 +46,17 @@ public class InfluxdbDataHarmpowerSImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override @@ -63,7 +80,12 @@ public class InfluxdbDataHarmpowerSImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataHarmPowerSDto dto = new DataHarmPowerSDto(); BeanUtils.copyProperties(item,dto); @@ -76,7 +98,12 @@ public class InfluxdbDataHarmpowerSImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -250,4 +277,49 @@ public class InfluxdbDataHarmpowerSImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap,Boolean dataType) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmpowerS.class); + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmS_", "s_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + influxQueryWrapper.eq(DataHarmpowerS::getLineId, lineId) + .eq(DataHarmpowerS::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataHarmpowerS::getLineId) + .select(DataHarmpowerS::getPhasicType) + .select(DataHarmpowerS::getValueType) + //总功功率 + .select("Pq_S","s") + //基波有功功率 + .select("Pq_FundS","s_1") + .between(DataHarmpowerS::getTime, startTime, endTime) + .eq(DataHarmpowerS::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataHarmpowerS::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataHarmpowerS::getCldid,Integer.toString(po.getLineNo())); + } + result.addAll(dataHarmpowerSMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIImpl.java index c7363f0..c040535 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIImpl.java @@ -4,16 +4,17 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.constant.InfluxDBTableConstant; import com.njcn.dataProcess.dao.imapper.DataIMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataIRelationMapper; import com.njcn.dataProcess.dto.DataIDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; -import com.njcn.dataProcess.po.influx.DataHarmrateV; import com.njcn.dataProcess.po.influx.DataI; -import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; -import com.njcn.dataProcess.pojo.dto.DataHarmDto; import com.njcn.dataProcess.pojo.dto.DataIDto; import com.njcn.dataProcess.pojo.po.RStatDataID; import com.njcn.dataProcess.service.IDataI; @@ -25,10 +26,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -46,6 +49,18 @@ public class InfluxdbDataIImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; + @Override public void batchInsertion(List dataIDTOList) { @@ -68,7 +83,12 @@ public class InfluxdbDataIImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteDataI(lineParam); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteDataI(lineParam); + } else { + list = getMinuteDataI(lineParam); + } list.forEach(item->{ DataIDto dto = new DataIDto(); BeanUtils.copyProperties(item,dto); @@ -81,7 +101,12 @@ public class InfluxdbDataIImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List dataIList = getMinuteDataI(lineParam); + List dataIList; + if (Objects.equals(lineParam.getType(), 2)) { + dataIList = getWlMinuteDataI(lineParam); + } else { + dataIList = getMinuteDataI(lineParam); + } if (CollectionUtil.isNotEmpty(dataIList)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -293,4 +318,64 @@ public class InfluxdbDataIImpl extends MppServiceImpl getWlMinuteDataI(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineParam.getLineId())) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineParam.getLineId()).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + + lineParam.getLineId().forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataI.class); + //2-50次 谐波电流有效值 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmI_", "i_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + influxQueryWrapper.eq(DataI::getLineId, lineId) + .eq(DataI::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataI::getLineId) + .select(DataI::getPhasicType) + .select(DataI::getValueType) + //电流负序 + .select("Pq_SeqNegI","i_neg") + //电流正序 + .select("Pq_SeqPosI","i_pos") + //电流总谐波畸变率 + .select("Pq_ThdI","i_thd") + //电流负序不平衡度 + .select("Pq_UnbalNegI","i_unbalance") + //电流零序 + .select("Pq_SeqZeroI","i_zero") + //电流总有效值 + .select("Pq_RmsI","rms") + //电流基波有效值 + .select("Pq_RmsFundI","i_1") + .between(DataI::getTime, lineParam.getStartTime(), lineParam.getEndTime()) + .eq(DataI::getQualityFlag, "0"); + if (CollUtil.isNotEmpty(lineParam.getPhasicType())) { + influxQueryWrapper.regular(DataI::getPhasicType, lineParam.getPhasicType()); + } + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataI::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataI::getCldid,Integer.toString(po.getLineNo())); + } + result.addAll(dataIMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmIImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmIImpl.java index c9572d5..88a0762 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmIImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmIImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataInharmIMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataInHarmIRelationMapper; import com.njcn.dataProcess.dto.DataInharmIDTO; @@ -21,10 +25,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -40,7 +46,17 @@ public class InfluxdbDataInharmIImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override public void batchInsertion(List dataInharmIDTOList) { @@ -63,7 +79,12 @@ public class InfluxdbDataInharmIImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + list = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + } list.forEach(item->{ DataInHarmIDto dto = new DataInHarmIDto(); BeanUtils.copyProperties(item,dto); @@ -76,7 +97,12 @@ public class InfluxdbDataInharmIImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + } else { + data = getMinuteData(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),lineParam.getAbnormalTime(),lineParam.getDataType()); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -247,4 +273,46 @@ public class InfluxdbDataInharmIImpl extends MppServiceImpl getWlMinuteData(List lineList, String startTime, String endTime, Map> timeMap, Boolean dataType) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineList)) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineList).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineList.forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataInharmI.class); + //2-50次 间谐波电流有效值 + influxQueryWrapper.samePrefixAndSuffix("Pq_InHarmIAmp_", "i_", HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); + influxQueryWrapper.eq(DataInharmI::getLineId, lineId) + .eq(DataInharmI::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataInharmI::getLineId) + .select(DataInharmI::getPhasicType) + .select(DataInharmI::getValueType) + .between(DataInharmI::getTime, startTime, endTime) + .eq(DataInharmI::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataInharmI::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataInharmI::getCldid,Integer.toString(po.getLineNo())); + } + result.addAll(dataInharmIMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmVImpl.java index c53e8c4..ff93711 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataInharmVImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataInharmVMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataInHarmVRelationMapper; import com.njcn.dataProcess.dto.DataInharmVDTO; @@ -22,10 +26,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; @@ -43,7 +49,17 @@ public class InfluxdbDataInharmVImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override public void batchInsertion(List dataInharmVDTOList) { @@ -66,7 +82,12 @@ public class InfluxdbDataInharmVImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteData(lineParam); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteData(lineParam); + } else { + list = getMinuteData(lineParam); + } list.forEach(item->{ DataHarmDto dto = new DataHarmDto(); BeanUtils.copyProperties(item,dto); @@ -79,7 +100,12 @@ public class InfluxdbDataInharmVImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteData(lineParam); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteData(lineParam); + } else { + data = getMinuteData(lineParam); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -252,4 +278,54 @@ public class InfluxdbDataInharmVImpl extends MppServiceImpl getWlMinuteData(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineParam.getLineId())) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineParam.getLineId()).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineParam.getLineId().forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataInharmV.class); + //2-50次 间谐波电流有效值 + influxQueryWrapper.eq(DataInharmV::getLineId, lineId) + .eq(DataInharmV::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataInharmV::getLineId) + .select(DataInharmV::getPhasicType) + .select(DataInharmV::getValueType) + .between(DataInharmV::getTime, lineParam.getStartTime(), lineParam.getEndTime()) + .eq(DataInharmV::getQualityFlag, "0"); + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataInharmV::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataInharmV::getCldid,Integer.toString(po.getLineNo())); + } + //判断接线方式 (0-星型 1-角型 2-V型) 星型是相电压 角型或者v型是线电压 + if (Objects.equals(po.getConType(),0)) { + //0.5-49.5次 间谐波相电压有效值 + influxQueryWrapper.samePrefixAndSuffix("Pq_InHarmURV_", "v_", HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); + } else { + //0.5-49.5次 间谐波线电压有效值 + influxQueryWrapper.samePrefixAndSuffix("Pq_InHarmLURV_", "v_", HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); + } + result.addAll(dataInharmVMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java index 42caac9..a8e0423 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataPltImpl.java @@ -3,6 +3,10 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.dao.imapper.DataPltMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataPltRelationMapper; import com.njcn.dataProcess.dto.DataPltDTO; @@ -20,10 +24,12 @@ import org.apache.commons.collections4.ListUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -40,6 +46,17 @@ public class InfluxdbDataPltImpl extends MppServiceImpl PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Override public void batchInsertion(List dataPltDTOList) { @@ -63,7 +80,12 @@ public class InfluxdbDataPltImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteDataPlt(lineParam); + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteDataPlt(lineParam); + } else { + list = getMinuteDataPlt(lineParam); + } list.forEach(item->{ DataPltDto dto = new DataPltDto(); BeanUtils.copyProperties(item,dto); @@ -76,7 +98,12 @@ public class InfluxdbDataPltImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List data = getMinuteDataPlt(lineParam); + List data; + if (Objects.equals(lineParam.getType(), 2)) { + data = getWlMinuteDataPlt(lineParam); + } else { + data = getMinuteDataPlt(lineParam); + } if (CollectionUtil.isNotEmpty(data)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -199,4 +226,56 @@ public class InfluxdbDataPltImpl extends MppServiceImpl getWlMinuteDataPlt(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineParam.getLineId())) { + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineParam.getLineId()).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + + lineParam.getLineId().forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataPlt.class); + influxQueryWrapper.eq(DataPlt::getLineId, lineId) + .eq(DataPlt::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataPlt::getLineId) + .select(DataPlt::getPhasicType) + .select(DataPlt::getValueType) + .between(DataPlt::getTime, lineParam.getStartTime(), lineParam.getEndTime()) + .eq(DataPlt::getQualityFlag, "0"); + if (CollUtil.isNotEmpty(lineParam.getPhasicType())) { + influxQueryWrapper.regular(DataPlt::getPhasicType, lineParam.getPhasicType()); + } + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataPlt::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataPlt::getCldid,Integer.toString(po.getLineNo())); + } + //判断接线方式 (0-星型 1-角型 2-V型) 星型是相电压 角型或者v型是线电压 + if (Objects.equals(po.getConType(),0)) { + //相电压长时闪变 + influxQueryWrapper.select("Pq_Plt","plt"); + } else { + //线电压长时闪变 + influxQueryWrapper.select("Pq_LPlt","plt"); + } + result.addAll(dataPltMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java index 2d45776..e42530f 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java @@ -6,6 +6,10 @@ import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.ObjectUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.dataProcess.constant.InfluxDBTableConstant; import com.njcn.dataProcess.constant.PhaseType; import com.njcn.dataProcess.dao.imapper.DataVMapper; @@ -34,6 +38,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -46,6 +51,17 @@ import java.util.stream.Collectors; public class InfluxdbDataVImpl extends MppServiceImpl implements IDataV { private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + @Resource + private CsLineFeignClient csLineFeignClient; + @Resource + private EquipmentFeignClient equipmentFeignClient; + + private static final Map PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + put("M", "T"); + }}; @Resource private DataVMapper dataVMapper; @@ -122,14 +138,20 @@ public class InfluxdbDataVImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List list = getMinuteDataV(lineParam); - list.forEach(item -> { - DataVDto dto = new DataVDto(); - BeanUtils.copyProperties(item, dto); - dto.setMinTime(DATE_TIME_FORMATTER.format(item.getTime())); - result.add(dto); - }); - + List list; + if (Objects.equals(lineParam.getType(), 2)) { + list = getWlMinuteDataV(lineParam); + } else { + list = getMinuteDataV(lineParam); + } + if (CollectionUtil.isNotEmpty(list)) { + list.forEach(item -> { + DataVDto dto = new DataVDto(); + BeanUtils.copyProperties(item, dto); + dto.setMinTime(DATE_TIME_FORMATTER.format(item.getTime())); + result.add(dto); + }); + } return result; } @@ -205,7 +227,12 @@ public class InfluxdbDataVImpl extends MppServiceImpl getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List dataVList = getMinuteDataV(lineParam); + List dataVList; + if (Objects.equals(lineParam.getType(), 2)) { + dataVList = getWlMinuteDataV(lineParam); + } else { + dataVList = getMinuteDataV(lineParam); + } if (CollectionUtil.isNotEmpty(dataVList)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -403,7 +430,7 @@ public class InfluxdbDataVImpl extends MppServiceImpl getMinuteDataV(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class); - influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, "", HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); + influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, InfluxDbSqlConstant.V, HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); influxQueryWrapper.regular(DataV::getLineId, lineParam.getLineId()) .select(DataV::getLineId) .select(DataV::getPhasicType) @@ -439,6 +466,90 @@ public class InfluxdbDataVImpl extends MppServiceImpl getWlMinuteDataV(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + if (CollectionUtil.isNotEmpty(lineParam.getLineId())) { + //fixme 这边查询的数据可以缓存起来,因为日表计算时可以使用 + //获取监测点信息 + List line = csLineFeignClient.queryLineById(lineParam.getLineId()).getData(); + Map lineMap = line.stream().collect(Collectors.toMap(CsLinePO::getLineId, Function.identity())); + //获取设备信息 + List devList = line.stream().map(CsLinePO::getDeviceId).distinct().collect(Collectors.toList()); + List dev = equipmentFeignClient.queryDeviceById(devList).getData(); + Map devsMap = dev.stream().collect(Collectors.toMap(CsEquipmentDeliveryDTO::getId, Function.identity())); + lineParam.getLineId().forEach(lineId -> { + String devId = lineMap.get(lineId).getDeviceId(); + CsLinePO po = lineMap.get(lineId); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class); + influxQueryWrapper.eq(DataV::getLineId, lineId) + .eq(DataV::getProcess,Integer.toString(devsMap.get(devId).getProcess())) + .select(DataV::getLineId) + .select(DataV::getPhasicType) + .select(DataV::getValueType) + //频率 + .select("Pq_Freq","freq") + //频率偏差 + .select("Pq_FreqDev","freq_dev") + //相电压有效值 + .select("Pq_RmsU","rms") + //线电压有效值 + .select("Pq_RmsLU","rms_lvr") + //电压负序 + .select("Pq_SeqNegU","v_neg") + //电压正序 + .select("Pq_SeqPosU","v_pos") + //电压零序 + .select("Pq_SeqZeroU","v_zero") + //电压负序不平衡度 + .select("Pq_UnbalNegU","v_unbalance") + .between(DataV::getTime, lineParam.getStartTime(), lineParam.getEndTime()) + .eq(DataV::getQualityFlag, "0"); + if (CollUtil.isNotEmpty(lineParam.getPhasicType())) { + influxQueryWrapper.regular(DataV::getPhasicType, lineParam.getPhasicType()); + } + if (Objects.isNull(po.getLineNo())) { + influxQueryWrapper.eq(DataV::getCldid,Integer.toString(po.getClDid())); + } else { + influxQueryWrapper.eq(DataV::getCldid,Integer.toString(po.getLineNo())); + } + //判断接线方式 (0-星型 1-角型 2-V型) 星型是相电压 角型或者v型是线电压 + if (Objects.equals(po.getConType(),0)) { + //相电压偏差 + influxQueryWrapper.select("Pq_UDev","vu_dev"); + //相电压谐波总畸变率 + influxQueryWrapper.select("Pq_ThdU","v_thd"); + //相电压基波有效值 + influxQueryWrapper.select("Pq_RmsFundU","v_1"); + //2-50次 相电压谐波有效值 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmUV_", "v_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + } else { + //线电压偏差 + influxQueryWrapper.select("Pq_LUDev","vu_dev"); + //线电压谐波总畸变率 + influxQueryWrapper.select("Pq_ThdLU","v_thd"); + //线电压基波有效值 + influxQueryWrapper.select("Pq_RmsFundLU","v_1"); + //2-50次 线电压谐波有效值 + influxQueryWrapper.samePrefixAndSuffix("Pq_HarmLUV_", "v_", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + } + result.addAll(dataVMapper.selectByQueryWrapper(influxQueryWrapper)); + }); + if (CollectionUtil.isNotEmpty(result)) { + result.forEach(item -> { + String newType = PHASE_MAPPING.get(item.getPhasicType()); + if (newType != null) { + item.setPhasicType(newType); + } + }); + } + } + return result; + } + private void quality(List result, InfluxQueryWrapper influxQueryWrapper, LineCountEvaluateParam lineParam) { List dataList; List list = dataVMapper.selectByQueryWrapper(influxQueryWrapper);