From dc178ff92078eaf7801cd7caf4305fab0f8fb014 Mon Sep 17 00:00:00 2001 From: chendaofei <857448963@qq.com> Date: Thu, 13 Mar 2025 10:16:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=AE=8C=E6=95=B4=E6=80=A7?= =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/line/IPollutionService.java | 11 + .../line/IDataIntegrityServiceImpl.java | 33 +- .../line/PollutionServiceImpl.java | 496 ++++++++++++++++++ .../api/DataHarmRateVFeignClient.java | 3 + .../api/DataIntegrityFeignClient.java | 26 + .../dataProcess/api/DataVFeignClient.java | 7 + ...taHarmRateVFeignClientFallbackFactory.java | 6 + ...taIntegrityFeignClientFallbackFactory.java | 50 ++ .../DataVFeignClientFallbackFactory.java | 15 + .../dataProcess/dto/MeasurementCountDTO.java | 29 + .../njcn/dataProcess/dto/PollutionDTO.java | 64 +++ .../param/LineCountEvaluateParam.java | 5 + .../dataProcess/pojo/po/RStatIntegrityD.java | 2 +- .../controller/DataHarmRateVController.java | 9 + .../controller/DataIController.java | 9 + .../controller/DataIntegrityController.java | 48 ++ .../controller/DataVController.java | 12 + .../dataProcess/dao/imapper/DataVMapper.java | 8 + .../dataProcess/service/IDataHarmRateV.java | 3 + .../dataProcess/service/IDataIntegrity.java | 2 +- .../com/njcn/dataProcess/service/IDataV.java | 7 + .../influxdb/InfluxdbDataHarmRateVImpl.java | 10 +- .../influxdb/InfluxdbDataIntegrityImpl.java | 2 +- .../impl/influxdb/InfluxdbDataVImpl.java | 18 + .../impl/relation/RStatIntegrityDImpl.java | 9 + .../relation/RelationDataHarmRateVImpl.java | 24 + .../relation/RelationDataIntegrityImpl.java | 5 +- .../impl/relation/RelationDataVImpl.java | 39 +- 28 files changed, 919 insertions(+), 33 deletions(-) create mode 100644 algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IPollutionService.java create mode 100644 algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java create mode 100644 data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataIntegrityFeignClient.java create mode 100644 data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataIntegrityFeignClientFallbackFactory.java create mode 100644 data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java create mode 100644 data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/PollutionDTO.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIntegrityController.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RStatIntegrityDImpl.java diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IPollutionService.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IPollutionService.java new file mode 100644 index 0000000..16f6808 --- /dev/null +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IPollutionService.java @@ -0,0 +1,11 @@ +package com.njcn.algorithm.service.line; + +import com.njcn.algorithm.pojo.bo.CalculatedParam; + +/** + * 测点污区数据 + */ +public interface IPollutionService { + + void handleDay(CalculatedParam calculatedParam); +} diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java index 4250537..e458eeb 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java @@ -1,12 +1,16 @@ package com.njcn.algorithm.serviceimpl.line; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IDataIntegrityService; +import com.njcn.dataProcess.api.DataIntegrityFeignClient; +import com.njcn.dataProcess.api.DataVFeignClient; +import com.njcn.dataProcess.dto.MeasurementCountDTO; +import com.njcn.dataProcess.pojo.po.RStatIntegrityD; import com.njcn.device.biz.commApi.CommTerminalGeneralClient; import com.njcn.device.biz.pojo.dto.LineDevGetDTO; -import com.njcn.device.pq.pojo.po.RStatIntegrityD; import com.njcn.influx.constant.InfluxDbSqlConstant; import com.njcn.influx.pojo.bo.MeasurementCount; import com.njcn.influx.pojo.constant.InfluxDBTableConstant; @@ -33,6 +37,10 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService { @Resource private CommTerminalGeneralClient commTerminalGeneralClient; + private final DataVFeignClient dataVFeignClient; + + private final DataIntegrityFeignClient dataIntegrityFeignClient; + @Override public void dataIntegrity(CalculatedParam calculatedParam) { @@ -50,7 +58,7 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService { List> pendingIds = ListUtils.partition(lineIds,5); for (List pendingId : pendingIds) { List lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(pendingId).getData(); - List countList = this.getMeasurementCount(pendingId,beginDay,endDay); + List countList = dataVFeignClient.getMeasurementCount(pendingId,beginDay,endDay).getData(); poList.addAll( lineDevGetDTOList.stream() .map(item -> { @@ -68,27 +76,12 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService { .collect(Collectors.toList()) ); } + if(CollUtil.isNotEmpty(poList)){ + dataIntegrityFeignClient.batchInsertion(poList); + } } - /** - * 获取data_v中各个监测点的数据总数 - * @param lineIndex 监测点索引 - * @param startTime 起始时间 - * @param endTime 结束时间 - */ - public List getMeasurementCount(List lineIndex, String startTime, String endTime) { - InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class,MeasurementCount.class); - influxQueryWrapper.regular(DataV::getLineId, lineIndex) - .eq(DataV::getValueType, InfluxDbSqlConstant.MAX) - .eq(DataV::getPhaseType, InfluxDBTableConstant.PHASE_TYPE_A) - .count(DataV::getFreq) - .groupBy(DataV::getLineId) - .between(DataV::getTime, startTime, endTime); - //return dataVMapper.getMeasurementCount(influxQueryWrapper); - //TODO 调用插入数据库 - return null; - } } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java new file mode 100644 index 0000000..1463553 --- /dev/null +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java @@ -0,0 +1,496 @@ +package com.njcn.algorithm.serviceimpl.line; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.LocalDateTimeUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.algorithm.service.line.IPollutionService; +import com.njcn.dataProcess.api.DataHarmRateVFeignClient; +import com.njcn.dataProcess.api.DataVFeignClient; +import com.njcn.dataProcess.dto.DataVDTO; +import com.njcn.dataProcess.dto.PollutionDTO; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.DataHarmDto; +import com.njcn.dataProcess.pojo.dto.DataVDto; +import com.njcn.device.biz.commApi.CommTerminalGeneralClient; +import com.njcn.device.biz.pojo.po.Overlimit; +import com.njcn.device.pq.pojo.dto.PublicDTO; +import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.enums.DicDataTypeEnum; +import com.njcn.system.pojo.po.DictData; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDate; +import java.util.*; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * @Author: cdf + * @CreateTime: 2025-03-12 + * @Description: 测点污区数据 + */ +@Service +@Slf4j +@RequiredArgsConstructor +public class PollutionServiceImpl implements IPollutionService { + + private final DicDataFeignClient dicDataFeignClient; + + private final CommTerminalGeneralClient commTerminalGeneralClient; + + private final DataVFeignClient dataVFeignClient; + + private final DataHarmRateVFeignClient dataHarmRateVFeignClient; + + @Override + public void handleDay(CalculatedParam calculatedParam) { + List pollutionList ; + + List dictData = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.POLLUTION_STATIS.getCode()).getData(); + LocalDate local = LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()); + String beginDay =LocalDateTimeUtil.format(LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN)),DatePattern.NORM_DATETIME_PATTERN); + String endDay = LocalDateTimeUtil.format(LocalDateTimeUtil.endOfDay(LocalDateTimeUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN)),DatePattern.NORM_DATETIME_PATTERN); + List idList = calculatedParam.getIdList(); + List overlimitList = commTerminalGeneralClient.getOverLimitDataByIds(idList).getData(); + Map limitMap = overlimitList.stream().collect(Collectors.toMap(Overlimit::getId,Function.identity())); + + + + //指标数据获取 + List harmonicVoltageList = getHarmonicVoltage(idList,limitMap,beginDay,endDay); + List harmonicCurrentList = getHarmonicCurrent(idList,limitMap,beginDay,endDay); + List frequencyDeviationList = getFrequencyDeviation(idList,limitMap,beginDay,endDay); + List voltageDeviationList = getVoltageDeviation(idList,limitMap,beginDay,endDay); + List threePhaseVoltageList = getThreePhaseVoltageUnbalance(idList,limitMap,beginDay,endDay); + List negativeSequenceList = getNegativeSequenceCurrent(idList,limitMap,beginDay,endDay); + List interHarmonicVoltageList = getInterharmonicVoltage(idList,limitMap,beginDay,endDay); + List voltageFlickerList = getVoltageFlicker(idList,limitMap,beginDay,endDay); + + List lineIdList = new ArrayList<>(); + idList.forEach(item->{ + PollutionDTO pollutionDTO = new PollutionDTO(); + pollutionDTO.setLineId(item); + lineIdList.add(pollutionDTO); + }); + pollutionList = processPollutionList(lineIdList,harmonicVoltageList,harmonicCurrentList,frequencyDeviationList,voltageDeviationList,threePhaseVoltageList,negativeSequenceList,interHarmonicVoltageList,voltageFlickerList); + insertPollutionDayMySql(pollutionList, dictData, local); + } + + + /** + * 谐波电压:取监测点最新的A、B、C三相数据,再取电压总谐波畸变率、各次谐波电压含有率(2~25次)中的最大值,作为结果 + */ + private List getHarmonicVoltage(List ids,Map limitMap,String beginTime,String endTime){ + List list = new ArrayList<>(); + + Map threePhase = getThreePhaseData(ids,limitMap,beginTime,endTime); + Map distortionRate = getDistortionRateData(ids,limitMap,beginTime,endTime); + for (String key : threePhase.keySet()) { + list.add(threePhase.get(key)); + } + for (String key : distortionRate.keySet()) { + list.add(distortionRate.get(key)); + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = list.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + /** + * 出参处理 + */ + private List process(Map> outMap){ + List outList = new ArrayList<>(); + for (String key : outMap.keySet()) { + outList.add(outMap.get(key).get()); + } + return outList; + } + + private Map getThreePhaseData(List ids,Map limitMap,String beginTime,String endTime){ + LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam(); + lineCountEvaluateParam.setLineId(ids); + lineCountEvaluateParam.setValueType(Stream.of("CP95").collect(Collectors.toList())); + lineCountEvaluateParam.setPhasicType(Stream.of("A","B","C").collect(Collectors.toList())); + lineCountEvaluateParam.setStartTime(beginTime); + lineCountEvaluateParam.setEndTime(endTime); + List threePhaseList = dataVFeignClient.getDataV(lineCountEvaluateParam).getData(); + Map> dataVMap = threePhaseList.stream().collect(Collectors.groupingBy(DataVDto::getLineId)); + + Map map = new HashMap<>(); + dataVMap.forEach((lineId,vList)->{ + PublicDTO publicDTO = new PublicDTO(); + if(limitMap.containsKey(lineId)){ + Overlimit overlimit = limitMap.get(lineId); + double val = vList.stream().mapToDouble(DataVDto::getVThd).max().getAsDouble(); + double vUnbalance = val/overlimit.getUaberrance(); + publicDTO.setId(lineId); + publicDTO.setData(vUnbalance); + map.put(lineId,publicDTO); + } + }); + return map; + } + + private Map getDistortionRateData(List ids,Map limitMap,String beginTime,String endTime){ + Map mapResult = new HashMap<>(); + + LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam(); + lineCountEvaluateParam.setLineId(ids); + lineCountEvaluateParam.setValueType(Stream.of("CP95").collect(Collectors.toList())); + lineCountEvaluateParam.setPhasicType(Stream.of("A","B","C").collect(Collectors.toList())); + lineCountEvaluateParam.setStartTime(beginTime); + lineCountEvaluateParam.setEndTime(endTime); + List distortionRateList = dataHarmRateVFeignClient.getHarmRateVData(lineCountEvaluateParam).getData(); + + + + Map> harmDataMap = distortionRateList.stream().collect(Collectors.groupingBy(DataHarmDto::getLineId)); + harmDataMap.forEach((lineId,vList)->{ + Overlimit overlimit = limitMap.get(lineId); + List temList = new ArrayList<>(); + for(DataHarmDto dayHarmrateV : vList){ + PublicDTO publicDTO = new PublicDTO(); + double v2 = dayHarmrateV.getV2()/overlimit.getUharm2(); + double v3 = dayHarmrateV.getV3()/overlimit.getUharm3(); + double v4 = dayHarmrateV.getV4()/overlimit.getUharm4(); + double v5 = dayHarmrateV.getV5()/overlimit.getUharm5(); + double v6 = dayHarmrateV.getV6()/overlimit.getUharm6(); + double v7 = dayHarmrateV.getV7()/overlimit.getUharm7(); + double v8 = dayHarmrateV.getV8()/overlimit.getUharm8(); + double v9 = dayHarmrateV.getV9()/overlimit.getUharm9(); + double v10 = dayHarmrateV.getV10()/overlimit.getUharm10(); + double v11 = dayHarmrateV.getV11()/overlimit.getUharm11(); + double v12 = dayHarmrateV.getV12()/overlimit.getUharm12(); + double v13 = dayHarmrateV.getV13()/overlimit.getUharm13(); + double v14 = dayHarmrateV.getV14()/overlimit.getUharm14(); + double v15 = dayHarmrateV.getV15()/overlimit.getUharm15(); + double v16 = dayHarmrateV.getV16()/overlimit.getUharm16(); + double v17 = dayHarmrateV.getV17()/overlimit.getUharm17(); + double v18 = dayHarmrateV.getV18()/overlimit.getUharm18(); + double v19 = dayHarmrateV.getV19()/overlimit.getUharm19(); + double v20 = dayHarmrateV.getV20()/overlimit.getUharm20(); + double v21 = dayHarmrateV.getV21()/overlimit.getUharm21(); + double v22 = dayHarmrateV.getV22()/overlimit.getUharm22(); + double v23 = dayHarmrateV.getV23()/overlimit.getUharm23(); + double v24 = dayHarmrateV.getV24()/overlimit.getUharm24(); + double v25 = dayHarmrateV.getV25()/overlimit.getUharm25(); + List data = Stream.of(v2,v3,v4,v5,v6,v7,v8,v9,v10,v11,v12,v13,v14,v15,v16,v17,v18,v19,v20,v21,v22,v23,v24,v25).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO.setId(lineId); + publicDTO.setData(result); + temList.add(publicDTO); + } + PublicDTO publicDTO = new PublicDTO(); + publicDTO.setId(lineId); + publicDTO.setData(temList.stream().mapToDouble(PublicDTO::getData).max().getAsDouble()); + mapResult.put(lineId,publicDTO); + }); + + return mapResult; + } + + /** + * 谐波电流:各次谐波电流(2~25次),取各监测点最新的A、B、C三相数据。 + */ + private List getHarmonicCurrent(List ids,Map limitMap,String beginTime,String endTime){ + List list = dataIDService.list(new LambdaQueryWrapper() + .eq(RStatDataIDPO::getValueType, "CP95") + .in(RStatDataIDPO::getPhaseType, Arrays.asList("A", "B", "C")) + .ge(RStatDataIDPO::getTime, beginTime) + .le(RStatDataIDPO::getTime, endTime) + ); + List data; + PublicDTO publicDTO; + List lineData = new ArrayList<>(); + for (RStatDataIDPO dayI : list) { + for (Overlimit overlimit : overLimitList) { + if (Objects.equals(dayI.getLineId(),overlimit.getId())){ + double v2 = dayI.getI2()/overlimit.getIharm2(); + double v3 = dayI.getI3()/overlimit.getIharm3(); + double v4 = dayI.getI4()/overlimit.getIharm4(); + double v5 = dayI.getI5()/overlimit.getIharm5(); + double v6 = dayI.getI6()/overlimit.getIharm6(); + double v7 = dayI.getI7()/overlimit.getIharm7(); + double v8 = dayI.getI8()/overlimit.getIharm8(); + double v9 = dayI.getI9()/overlimit.getIharm9(); + double v10 = dayI.getI10()/overlimit.getIharm10(); + double v11 = dayI.getI11()/overlimit.getIharm11(); + double v12 = dayI.getI12()/overlimit.getIharm12(); + double v13 = dayI.getI13()/overlimit.getIharm13(); + double v14 = dayI.getI14()/overlimit.getIharm14(); + double v15 = dayI.getI15()/overlimit.getIharm15(); + double v16 = dayI.getI16()/overlimit.getIharm16(); + double v17 = dayI.getI17()/overlimit.getIharm17(); + double v18 = dayI.getI18()/overlimit.getIharm18(); + double v19 = dayI.getI19()/overlimit.getIharm19(); + double v20 = dayI.getI20()/overlimit.getIharm20(); + double v21 = dayI.getI21()/overlimit.getIharm21(); + double v22 = dayI.getI22()/overlimit.getIharm22(); + double v23 = dayI.getI23()/overlimit.getIharm23(); + double v24 = dayI.getI24()/overlimit.getIharm24(); + double v25 = dayI.getI25()/overlimit.getIharm25(); + data = Stream.of(v2,v3,v4,v5,v6,v7,v8,v9,v10,v11,v12,v13,v14,v15,v16,v17,v18,v19,v20,v21,v22,v23,v24,v25).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO = new PublicDTO(); + publicDTO.setId(dayI.getLineId()); + publicDTO.setData(result); + lineData.add(publicDTO); + } + } + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + /** + * 频率偏差:各监测点最新的T相数据,取频率上下偏差的绝对值中的最大值。 + */ + private List getFrequencyDeviation(List overLimitList,String beginTime,String endTime){ + List threePhaseList = dataVDService.list(new QueryWrapper() + .select("line_id","abs(freq_dev) as freq_dev ") + .in("value_type", Arrays.asList("MIN","MAX")) + .in("phasic_type", Arrays.asList("T")) + .ge("time", beginTime) + .le("time",endTime) + ); + List data; + PublicDTO publicDTO; + List lineData = new ArrayList<>(); +// String sql = "SELECT line_id,abs(freq_dev) AS freq_dev FROM day_v where phasic_type = 'T' and (value_type = 'MIN' or value_type = 'MAX') "+ processDate(dataDate,Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)) +"group by line_id order by time desc limit 2 tz('Asia/Shanghai')"; +// InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); +// +// QueryResult sqlResult = influxDbUtils.query(sql); +// List list = resultMapper.toPOJO(sqlResult, DayVPO.class); + for (RStatDataVDPO dayV : threePhaseList) { + for (Overlimit overlimit : overLimitList) { + if (Objects.equals(dayV.getLineId(),overlimit.getId())){ + double freqDev = dayV.getFreqDev()/overlimit.getFreqDev(); + data = Stream.of(freqDev).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO = new PublicDTO(); + publicDTO.setId(dayV.getLineId()); + publicDTO.setData(result); + lineData.add(publicDTO); + } + } + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + /** + * 电压偏差:各监测点最新的A、B、C三相数据,取电压上下偏差的绝对值中的最大值。 + */ + private List getVoltageDeviation(List overLimitList,String beginTime,String endTime){ + List list = dataVDService.list(new QueryWrapper() + .select("line_id","vu_dev","vl_dev","value_type") + .in("value_type", Arrays.asList("MIN","MAX")) + .in("phasic_type", Arrays.asList("A","B","C")) + .ge("time", beginTime) + .le("time", endTime) + ); + List data; + PublicDTO publicDTO; + List lineData = new ArrayList<>(); +// String sql = "SELECT line_id,vu_dev,vl_dev,value_type FROM day_v where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and (value_type = 'MIN' or value_type = 'MAX') "+ processDate(dataDate,Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)) +" group by line_id order by time desc limit 6 tz('Asia/Shanghai')"; +// QueryResult sqlResult = influxDbUtils.query(sql); +// InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); +// +// List list = resultMapper.toPOJO(sqlResult, DayVPO.class); + for (RStatDataVDPO dayV : list) { + for (Overlimit overlimit : overLimitList) { + if (Objects.equals(dayV.getLineId(),overlimit.getId())){ + double vlDev = Math.abs(dayV.getVlDev()/overlimit.getUvoltageDev()); + double vuDev = Math.abs(dayV.getVuDev()/overlimit.getVoltageDev()); + data = Stream.of(vuDev,vlDev).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO = new PublicDTO(); + publicDTO.setId(dayV.getLineId()); + publicDTO.setData(result); + lineData.add(publicDTO); + } + } + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + /** + * 三相电压不平衡度:各监测点最新的T相数据。 + */ + private List getThreePhaseVoltageUnbalance(List overLimitList,String beginTime,String endTime){ + List list = dataVDService.list(new QueryWrapper() + .select("line_id","v_unbalance","value_type") + .in("value_type", Arrays.asList("CP95","MAX")) + .in("phasic_type", Arrays.asList("T")) + .ge("time", beginTime) + .le("time", endTime) + ); + List data; + PublicDTO publicDTO; + List lineData = new ArrayList<>(); +// String sql = "SELECT line_id,v_unbalance,value_type FROM day_v where phasic_type = 'T' and (value_type = 'CP95' or value_type = 'MAX') "+ processDate(dataDate,Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)) +" group by line_id order by time desc limit 2 tz('Asia/Shanghai')"; +// InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); +// +// QueryResult sqlResult = influxDbUtils.query(sql); +// List list = resultMapper.toPOJO(sqlResult, DayVPO.class); + for (RStatDataVDPO dayV : list) { + for (Overlimit overlimit : overLimitList) { + if (Objects.equals(dayV.getLineId(),overlimit.getId())){ + double vUnbalance = Math.abs(dayV.getVUnbalance()/overlimit.getUbalance()); + data = Stream.of(vUnbalance).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO = new PublicDTO(); + publicDTO.setId(dayV.getLineId()); + publicDTO.setData(result); + lineData.add(publicDTO); + } + } + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + /** + * 负序电流:各监测点最新的T相数据。 + */ + private List getNegativeSequenceCurrent(List overLimitList,String beginTime,String endTime){ + List list = dataIDService.list(new QueryWrapper() + .select("line_id","i_neg","value_type") + .in("value_type", Arrays.asList("CP95","MAX")) + .in("phasic_type", Arrays.asList("T")) + .ge("time", beginTime) + .le("time", endTime) + ); + List data; + PublicDTO publicDTO; + List lineData = new ArrayList<>(); +// String sql = "SELECT line_id,i_neg,value_type FROM day_i where phasic_type = 'T' and (value_type = 'CP95' or value_type = 'MAX') "+ processDate(dataDate,Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)) +" group by line_id order by time desc limit 2 tz('Asia/Shanghai')"; +// InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); +// +// QueryResult sqlResult = influxDbUtils.query(sql); +// List list = resultMapper.toPOJO(sqlResult, DayIPO.class); + for (RStatDataIDPO dayI : list) { + for (Overlimit overlimit : overLimitList) { + if (Objects.equals(dayI.getLineId(),overlimit.getId()) && overlimit.getINeg() != 0){ + BigDecimal dayiNeg = BigDecimal.valueOf(dayI.getINeg()); + BigDecimal overNeg = BigDecimal.valueOf(overlimit.getINeg()); + double outNeg = dayiNeg.divide(overNeg,9, RoundingMode.HALF_UP).doubleValue(); + double iNeg = Math.abs(outNeg); + data = Stream.of(iNeg).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO = new PublicDTO(); + publicDTO.setId(dayI.getLineId()); + publicDTO.setData(result); + lineData.add(publicDTO); + } + } + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + /** + * 间谐波电压含有率:各监测点最新的A、B、C三相数据。 + */ + private List getInterharmonicVoltage(List overLimitList,String beginTime,String endTime){ + List list = inharmVDService.list(new QueryWrapper() + .in("value_type", Arrays.asList("CP95")) + .in("phasic_type", Arrays.asList("A","B","C")) + .ge("time", beginTime) + .le("time", endTime) + ); + List data; + PublicDTO publicDTO; + List lineData = new ArrayList<>(); +// String sql = "SELECT * FROM day_inharm_v where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and value_type = 'CP95' "+ processDate(dataDate,Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)) +" group by line_id order by time desc limit 3 tz('Asia/Shanghai')"; +// InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); +// +// QueryResult sqlResult = influxDbUtils.query(sql); +// List list = resultMapper.toPOJO(sqlResult, DayInharmVPO.class); + for (RStatDataInharmVDPO dayInharmV : list) { + for (Overlimit overlimit : overLimitList) { + if (Objects.equals(dayInharmV.getLineId(),overlimit.getId())){ + double v1 = Math.abs(dayInharmV.getV1()/overlimit.getInuharm1()); + double v2 = Math.abs(dayInharmV.getV2()/overlimit.getInuharm2()); + double v3 = Math.abs(dayInharmV.getV3()/overlimit.getInuharm3()); + double v4 = Math.abs(dayInharmV.getV4()/overlimit.getInuharm4()); + double v5 = Math.abs(dayInharmV.getV5()/overlimit.getInuharm5()); + double v6 = Math.abs(dayInharmV.getV6()/overlimit.getInuharm6()); + double v7 = Math.abs(dayInharmV.getV7()/overlimit.getInuharm7()); + double v8 = Math.abs(dayInharmV.getV8()/overlimit.getInuharm8()); + double v9 = Math.abs(dayInharmV.getV9()/overlimit.getInuharm9()); + double v10 = Math.abs(dayInharmV.getV10()/overlimit.getInuharm10()); + double v11 = Math.abs(dayInharmV.getV11()/overlimit.getInuharm11()); + double v12 = Math.abs(dayInharmV.getV12()/overlimit.getInuharm12()); + double v13 = Math.abs(dayInharmV.getV13()/overlimit.getInuharm13()); + double v14 = Math.abs(dayInharmV.getV14()/overlimit.getInuharm14()); + double v15 = Math.abs(dayInharmV.getV15()/overlimit.getInuharm15()); + double v16 = Math.abs(dayInharmV.getV16()/overlimit.getInuharm16()); + data = Stream.of(v1,v2,v3,v4,v5,v6,v7,v8,v9,v10,v11,v12,v13,v14,v15,v16).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO = new PublicDTO(); + publicDTO.setId(dayInharmV.getLineId()); + publicDTO.setData(result); + lineData.add(publicDTO); + } + } + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + /** + * 长时电压闪变:各监测点最新的A、B、C三相数据。 + */ + private List getVoltageFlicker(List overLimitList,String beginTime,String endTime){ + List list = pltDService.list(new QueryWrapper() + .in("value_type", Arrays.asList("CP95")) + .in("phasic_type", Arrays.asList("A","B","C")) + .ge("time", beginTime) + .le("time", endTime) + ); + List data; + PublicDTO publicDTO; + List lineData = new ArrayList<>(); + +// String sql = "SELECT * FROM day_plt where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and value_type = 'CP95' "+ processDate(dataDate,Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)) +" group by line_id order by time desc limit 3 tz('Asia/Shanghai')"; +// InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); +// +// QueryResult sqlResult = influxDbUtils.query(sql); +// List list = resultMapper.toPOJO(sqlResult, DayPltPO.class); + for (RStatDataPltDPO dayPlt : list) { + for (Overlimit overlimit : overLimitList) { + if (Objects.equals(dayPlt.getLineId(),overlimit.getId())){ + double plt = Math.abs(dayPlt.getPlt()/overlimit.getFlicker()); + data = Stream.of(plt).collect(Collectors.toList()); + double result = data.stream().max(Comparator.comparing(Double::doubleValue)).get(); + publicDTO = new PublicDTO(); + publicDTO.setId(dayPlt.getLineId()); + publicDTO.setData(result); + lineData.add(publicDTO); + } + } + } + Comparator comparator = Comparator.comparing(PublicDTO::getData); + Map> outMap = lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); + return process(outMap); + } + + +} diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java index b2a1270..5c30534 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java @@ -30,4 +30,7 @@ public interface DataHarmRateVFeignClient { @PostMapping("/getTopData") HttpResult getTopData(); + @PostMapping("/getHarmRateVData") + HttpResult> getHarmRateVData(@RequestBody LineCountEvaluateParam lineParam); + } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataIntegrityFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataIntegrityFeignClient.java new file mode 100644 index 0000000..07e8014 --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataIntegrityFeignClient.java @@ -0,0 +1,26 @@ +package com.njcn.dataProcess.api; + +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.dataProcess.api.fallback.DataOnlineRateFeignClientFallbackFactory; +import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; +import com.njcn.dataProcess.pojo.dto.DataOnlineRateDto; +import com.njcn.dataProcess.pojo.po.RStatIntegrityD; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +import java.util.List; + +/** + * @Description: + * @Author: wr + * @Date: 2025/3/7 9:30 + */ +@FeignClient(value = ServerInfo.PLATFORM_DATA_PROCESSING_BOOT, path = "/dataIntegrity", fallbackFactory = DataIntegrityFeignClient.class, contextId = "dataIntegrity") +public interface DataIntegrityFeignClient { + + @PostMapping("/batchInsertion") + HttpResult batchInsertion(@RequestBody List dataList); + +} diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java index ae1f9aa..6bf7dc1 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java @@ -4,6 +4,7 @@ import com.njcn.common.pojo.constant.ServerInfo; import com.njcn.common.pojo.response.HttpResult; import com.njcn.dataProcess.api.fallback.DataVFeignClientFallbackFactory; import com.njcn.dataProcess.dto.DataVDTO; +import com.njcn.dataProcess.dto.MeasurementCountDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVCvtDto; @@ -53,4 +54,10 @@ public interface DataVFeignClient { @PostMapping("/batchInsertionCvtDTO") HttpResult batchInsertionCvtDTO(@RequestBody List cvtDTOList); + @PostMapping("/getMeasurementCount") + HttpResult> getMeasurementCount(@RequestParam("lineIndex")List lineIndex, @RequestParam("startTime")String startTime, @RequestParam("endTime")String endTime); + + //获取原始数据 + @PostMapping("/getDataV") + HttpResult> getDataV(@RequestBody LineCountEvaluateParam lineParam); } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java index 71a68f4..18d817a 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java @@ -61,6 +61,12 @@ public class DataHarmRateVFeignClientFallbackFactory implements FallbackFactory< throw new BusinessException(finalExceptionEnum); } + @Override + public HttpResult> getHarmRateVData(LineCountEvaluateParam lineParam) { + log.error("{}异常,降级处理,异常为:{}","获取谐波含有率数据",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; } } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataIntegrityFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataIntegrityFeignClientFallbackFactory.java new file mode 100644 index 0000000..4fa12bd --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataIntegrityFeignClientFallbackFactory.java @@ -0,0 +1,50 @@ +package com.njcn.dataProcess.api.fallback; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.dataProcess.api.DataIntegrityFeignClient; +import com.njcn.dataProcess.api.DataOnlineRateFeignClient; +import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; +import com.njcn.dataProcess.pojo.dto.DataOnlineRateDto; +import com.njcn.dataProcess.pojo.po.RStatIntegrityD; +import com.njcn.dataProcess.util.DataProcessingEnumUtil; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @Description: + * @Author: wr + * @Date: 2025/3/7 9:30 + */ +@Slf4j +@Component +public class DataIntegrityFeignClientFallbackFactory implements FallbackFactory { + + + /** + * 输出远程请求接口异常日志 + * @param cause RPC请求异常 + */ + @Override + public DataIntegrityFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if(cause.getCause() instanceof BusinessException){ + BusinessException businessException = (BusinessException) cause.getCause(); + exceptionEnum = DataProcessingEnumUtil.getExceptionEnum(businessException.getResult()); + } + Enum finalExceptionEnum = exceptionEnum; + return new DataIntegrityFeignClient() { + @Override + public HttpResult batchInsertion(List dataList) { + log.error("{}异常,降级处理,异常为:{}","批量插入数据",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + + }; + } +} diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java index 8e65e64..230e344 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java @@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.dataProcess.api.DataVFeignClient; import com.njcn.dataProcess.dto.DataVDTO; +import com.njcn.dataProcess.dto.MeasurementCountDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVCvtDto; @@ -13,6 +14,8 @@ import com.njcn.dataProcess.util.DataProcessingEnumUtil; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; import java.time.LocalDateTime; import java.util.List; @@ -89,6 +92,18 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory> getMeasurementCount(List lineIndex,String startTime,String endTime){ + log.error("{}异常,降级处理,异常为:{}","cvt数据插入DataV",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + + @Override + public HttpResult> getDataV(@RequestBody LineCountEvaluateParam lineParam) { + log.error("{}异常,降级处理,异常为:{}","cvt数据插入DataV",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java new file mode 100644 index 0000000..e19c31b --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java @@ -0,0 +1,29 @@ +package com.njcn.dataProcess.dto; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.njcn.influx.utils.InstantDateSerializer; +import lombok.Data; +import org.influxdb.annotation.Column; +import org.influxdb.annotation.Measurement; + +import java.time.Instant; + +/** + * @Author: cdf + * @CreateTime: 2025-03-12 + * @Description: + */ +@Data +@Measurement(name = "data_v") +public class MeasurementCountDTO { + + @Column(name = "time") + @JsonSerialize(using = InstantDateSerializer.class) + private Instant time; + + @Column(name = "line_id") + private String lineId; + + @Column(name = "freq") + private String freq; +} diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/PollutionDTO.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/PollutionDTO.java new file mode 100644 index 0000000..23ff2a4 --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/PollutionDTO.java @@ -0,0 +1,64 @@ +package com.njcn.dataProcess.dto; + +import lombok.Data; + +/** + * @Author: cdf + * @CreateTime: 2025-03-12 + * @Description: + */ +@Data +public class PollutionDTO { + + /** + * 监测点id + */ + private String lineId; + + /** + * 频率偏差污染值 + */ + private Double freqDev = 0.0; + + /** + * 电压偏差污染值 + */ + private Double vDev = 0.0; + + /** + * 三相电压不平衡度污染值 + */ + private Double vUnbalance = 0.0; + + /** + * 谐波电压污染值 + */ + private Double vAll = 0.0; + + /** + * 长时电压闪变污染值 + */ + private Double plt = 0.0; + + /** + * 谐波电流污染值 + */ + private Double iAll = 0.0; + + /** + * 负序电流污染值 + */ + private Double iNeg = 0.0; + + /** + * 间谐波电压含有率污染值 + */ + private Double vInharm = 0.0; + + /** + * 谐波电压含有率污染值 + */ + private Double vHarmonic = 0.0; + + +} diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java index 7dc935e..5a8eec6 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/param/LineCountEvaluateParam.java @@ -30,6 +30,11 @@ public class LineCountEvaluateParam extends BaseParam implements Serializable { */ private List phasicType; + /** + * 值类型 + */ + private List valueType; + /** * 异常数据时间集合 * Map> key:监测点id value:异常时间集合 diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/po/RStatIntegrityD.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/po/RStatIntegrityD.java index 255b9b5..8ff5329 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/po/RStatIntegrityD.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/po/RStatIntegrityD.java @@ -29,7 +29,7 @@ public class RStatIntegrityD { @MppMultiId @TableField(value = "time_id") - private String timeId; + private LocalDate timeId; @MppMultiId @TableField(value = "line_index") diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java index 14a052c..fd1a96f 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java @@ -92,4 +92,13 @@ public class DataHarmRateVController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/getHarmRateVData") + @ApiOperation("获取谐波含有率数据") + public HttpResult> getHarmRateVData(@RequestBody LineCountEvaluateParam lineParam) { + String methodDescribe = getMethodDescribe("getHarmRateVData"); + List data = dataHarmRateVInsert.getHarmRateVData(lineParam); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIController.java index 277160e..26d96e7 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIController.java @@ -83,4 +83,13 @@ public class DataIController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/getDataI") + @ApiOperation("关系型数据库插入数据") + public HttpResult getDataI(@RequestBody LineCountEvaluateParam lineParam) { + String methodDescribe = getMethodDescribe("getDataI"); + // dataIInsert.getDataI(dataIDtoList); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe); + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIntegrityController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIntegrityController.java new file mode 100644 index 0000000..76b3c07 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataIntegrityController.java @@ -0,0 +1,48 @@ +package com.njcn.dataProcess.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.constant.OperateType; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.dataProcess.annotation.InsertBean; +import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; +import com.njcn.dataProcess.pojo.dto.DataLimitRateDto; +import com.njcn.dataProcess.pojo.po.RStatIntegrityD; +import com.njcn.dataProcess.service.IDataIntegrity; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * @Author: cdf + * @CreateTime: 2025-03-12 + * @Description: 数据完整性 + */ + +@RestController +@RequestMapping("/dataIntegrity") +@Api(tags = "数据完整性") +@RequiredArgsConstructor +public class DataIntegrityController extends BaseController { + + @InsertBean + private IDataIntegrity iDataIntegrity; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) + @PostMapping("/batchInsertion") + @ApiOperation("批量插入") + public HttpResult batchInsertion(@RequestBody List poList) { + String methodDescribe = getMethodDescribe("batchInsertion"); + iDataIntegrity.batchInsertion(poList); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java index c24444f..2f51a5e 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java @@ -10,6 +10,7 @@ import com.njcn.dataProcess.annotation.InsertBean; import com.njcn.dataProcess.annotation.QueryBean; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO; +import com.njcn.dataProcess.dto.MeasurementCountDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVCvtDto; @@ -133,4 +134,15 @@ public class DataVController extends BaseController { } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/getMeasurementCount") + @ApiOperation("获取算法基础数据") + public HttpResult> getMeasurementCount(List lineIndex, String startTime, String endTime) { + String methodDescribe = getMethodDescribe("getMeasurementCount"); + List data = dataVQuery.getMeasurementCount(lineIndex,startTime,endTime); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); + } + + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/imapper/DataVMapper.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/imapper/DataVMapper.java index c2b51f2..fe64d51 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/imapper/DataVMapper.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/imapper/DataVMapper.java @@ -1,10 +1,12 @@ package com.njcn.dataProcess.dao.imapper; import com.njcn.dataProcess.dto.LineDataVFiveItemDTO; +import com.njcn.dataProcess.dto.MeasurementCountDTO; import com.njcn.dataProcess.po.influx.DataV; import com.njcn.influx.base.InfluxDbBaseMapper; import com.njcn.influx.query.InfluxQueryWrapper; + import java.util.List; /** @@ -17,4 +19,10 @@ public interface DataVMapper extends InfluxDbBaseMapper { List queryDataValue(InfluxQueryWrapper dataVQueryWrapper); + + List getMeasurementCount(InfluxQueryWrapper influxQueryWrapper); + + + + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java index 0531b18..71193d6 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java @@ -37,4 +37,7 @@ IDataHarmRateV extends IMppService { void addList(List list); DataHarmDto getTopData(); + + List getHarmRateVData(LineCountEvaluateParam lineParam); + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataIntegrity.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataIntegrity.java index 80eeb0b..7f0b474 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataIntegrity.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataIntegrity.java @@ -18,6 +18,6 @@ public interface IDataIntegrity extends IMppService{ * 批量插入数据 * @param onlineRateList */ - void batchInsertion(List onlineRateList); + void batchInsertion(List onlineRateList); } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java index e8f5ef9..20bad6c 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java @@ -3,6 +3,7 @@ package com.njcn.dataProcess.service; import com.github.jeffreyning.mybatisplus.service.IMppService; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO; +import com.njcn.dataProcess.dto.MeasurementCountDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVCvtDto; @@ -59,4 +60,10 @@ public interface IDataV extends IMppService { void addInfluxDbList(List dataVList); void batchInsertionCvtDTO(List cvtDTOList); + + + List getMeasurementCount(List lineIndex, String startTime, String endTime); + + + List getDataV(LineCountEvaluateParam lineParam); } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java index 8233682..aab8e3d 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java @@ -26,10 +26,7 @@ import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; /** @@ -193,6 +190,11 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl getHarmRateVData(LineCountEvaluateParam lineParam) { + return Collections.emptyList(); + } + /** * 按监测点集合、时间条件获取分钟数据 * timeMap参数来判断是否进行数据处理 timeMap为空则不进行数据处理 diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIntegrityImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIntegrityImpl.java index b069a70..8e4a476 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIntegrityImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataIntegrityImpl.java @@ -23,7 +23,7 @@ public class InfluxdbDataIntegrityImpl extends MppServiceImpl onlineRateList) { + public void batchInsertion(List onlineRateList) { } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java index 0e305ac..7e0061b 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java @@ -12,6 +12,7 @@ import com.njcn.dataProcess.dao.relation.mapper.RStatDataVRelationMapper; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.dto.LineDataVFiveItemDTO; +import com.njcn.dataProcess.dto.MeasurementCountDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; @@ -319,6 +320,23 @@ public class InfluxdbDataVImpl extends MppServiceImpl getMeasurementCount(List lineIndex, String startTime, String endTime) { + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class,MeasurementCountDTO.class); + influxQueryWrapper.regular(DataV::getLineId, lineIndex) + .eq(DataV::getValueType, InfluxDbSqlConstant.MAX) + .eq(DataV::getPhasicType, InfluxDBTableConstant.PHASE_TYPE_A) + .count(DataV::getFreq) + .groupBy(DataV::getLineId) + .between(DataV::getTime, startTime, endTime); + return dataVMapper.getMeasurementCount(influxQueryWrapper); + } + + @Override + public List getDataV(LineCountEvaluateParam lineParam) { + return Collections.emptyList(); + } + /** * 按监测点集合、时间条件获取dataV分钟数据 * timeMap参数来判断是否进行数据出来 timeMap为空则不进行数据处理 diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RStatIntegrityDImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RStatIntegrityDImpl.java new file mode 100644 index 0000000..659f234 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RStatIntegrityDImpl.java @@ -0,0 +1,9 @@ +package com.njcn.dataProcess.service.impl.relation; + +/** + * @Author: cdf + * @CreateTime: 2025-03-12 + * @Description: 数据完整性 + */ +public class RStatIntegrityDImpl { +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java index 6ed17a4..2c04cb1 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java @@ -1,5 +1,8 @@ package com.njcn.dataProcess.service.impl.relation; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.date.DatePattern; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.dataProcess.dao.relation.mapper.DataHarmrateVRelationMapper; import com.njcn.dataProcess.dao.relation.mapper.DataIRelationMapper; @@ -10,7 +13,9 @@ import com.njcn.dataProcess.po.relation.DataHarmrateV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataHarmDto; import com.njcn.dataProcess.pojo.dto.DataHarmRateVDto; +import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.pojo.po.RStatDataHarmRateVD; +import com.njcn.dataProcess.pojo.po.RStatDataVD; import com.njcn.dataProcess.service.IDataHarmRateV; import com.njcn.dataProcess.util.BeanFeildUtils; import com.njcn.dataProcess.util.TimeUtils; @@ -20,6 +25,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -94,4 +100,22 @@ public class RelationDataHarmRateVImpl extends MppServiceImpl getHarmRateVData(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + List rStatDataHarmRateVDList = dataHarmRateV.list(new LambdaQueryWrapper() + .in(RStatDataHarmRateVD::getLineId,lineParam.getLineId()) + .in(RStatDataHarmRateVD::getValueType, lineParam.getValueType()) + .in(RStatDataHarmRateVD::getPhasicType, lineParam.getPhasicType()) + .ge(RStatDataHarmRateVD::getTime, lineParam.getStartTime()) + .le(RStatDataHarmRateVD::getTime, lineParam.getEndTime()) + ); + for(RStatDataHarmRateVD rStatDataHarmRateVD : rStatDataHarmRateVDList){ + DataHarmDto dto = BeanUtil.copyProperties(rStatDataHarmRateVD,DataHarmDto.class); + dto.setTime(rStatDataHarmRateVD.getTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATE_PATTERN))); + result.add(dto); + } + return result; + } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataIntegrityImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataIntegrityImpl.java index ca7cfc6..c8c3c26 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataIntegrityImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataIntegrityImpl.java @@ -1,6 +1,7 @@ package com.njcn.dataProcess.service.impl.relation; +import cn.hutool.core.bean.BeanUtil; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.dataProcess.dao.relation.mapper.RStatIntegrityDMapper; import com.njcn.dataProcess.pojo.dto.DataIntegrityDto; @@ -23,7 +24,7 @@ public class RelationDataIntegrityImpl extends MppServiceImpl onlineRateList) { - + public void batchInsertion(List dataIntegrityDtoList) { + this.saveOrUpdateBatchByMultiId(dataIntegrityDtoList); } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java index 53fc7a1..7b36cde 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java @@ -1,11 +1,18 @@ package com.njcn.dataProcess.service.impl.relation; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; +import com.njcn.dataProcess.constant.InfluxDBTableConstant; import com.njcn.dataProcess.dao.relation.mapper.DataVRelationMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataVRelationMapper; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO; +import com.njcn.dataProcess.dto.MeasurementCountDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.po.relation.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; @@ -15,17 +22,19 @@ import com.njcn.dataProcess.pojo.po.RStatDataVD; import com.njcn.dataProcess.service.IDataV; import com.njcn.dataProcess.util.BeanFeildUtils; import com.njcn.dataProcess.util.TimeUtils; +import com.njcn.influx.constant.InfluxDbSqlConstant; +import com.njcn.influx.query.InfluxQueryWrapper; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.ListUtils; +import org.ehcache.core.util.CollectionUtil; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.time.LocalDate; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.time.format.DateTimeFormatter; +import java.util.*; import java.util.stream.Collectors; /** @@ -125,5 +134,27 @@ public class RelationDataVImpl extends MppServiceImpl getMeasurementCount(List lineIndex, String startTime, String endTime) { + return Collections.emptyList(); + } + + @Override + public List getDataV(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + List rStatDataVDList = iDataV.list(new LambdaQueryWrapper() + .in(RStatDataVD::getValueType, lineParam.getValueType()) + .in(RStatDataVD::getPhasicType, lineParam.getPhasicType()) + .ge(RStatDataVD::getTime, lineParam.getStartTime()) + .le(RStatDataVD::getTime, lineParam.getEndTime()) + ); + for(RStatDataVD rStatDataVD : rStatDataVDList){ + DataVDto dto = BeanUtil.copyProperties(rStatDataVD,DataVDto.class); + dto.setTime(rStatDataVD.getTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATE_PATTERN))); + result.add(dto); + } + return result; + } + }