diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java index 2b02f65..2cd0526 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/PollutionServiceImpl.java @@ -1,12 +1,14 @@ package com.njcn.algorithm.serviceimpl.line; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.ListUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.IPollutionService; +import com.njcn.algorithm.utils.MemorySizeUtil; import com.njcn.dataProcess.api.*; import com.njcn.dataProcess.enums.DataCleanEnum; import com.njcn.dataProcess.param.LineCountEvaluateParam; @@ -25,6 +27,8 @@ import com.njcn.system.pojo.po.DictData; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections4.ListUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -45,6 +49,9 @@ import java.util.stream.Stream; @RequiredArgsConstructor public class PollutionServiceImpl implements IPollutionService { + @Value("${line.num}") + private Integer NUM = 100; + @Resource private PqDataVerifyFeignClient pqDataVerifyFeignClient; @Resource @@ -66,7 +73,7 @@ public class PollutionServiceImpl implements IPollutionService { @Override public void handleDay(CalculatedParam calculatedParam) { System.out.println("当前执行污区监测点算法++++++++++++++++++++++++++++++++++"); - List pollutionList; + List dictDataList = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.POLLUTION_STATIS.getCode()).getData(); Map dictData = dictDataList.stream().collect(Collectors.toMap(DictData::getCode, Function.identity())); LocalDate local = LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()); @@ -77,28 +84,33 @@ public class PollutionServiceImpl implements IPollutionService { Map limitMap = overlimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity())); LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam(); - lineCountEvaluateParam.setLineId(idList); lineCountEvaluateParam.setStartTime(beginDay); lineCountEvaluateParam.setEndTime(endDay); getAbnormalData(lineCountEvaluateParam); //指标数据获取 - Map harmonicVoltageList = getHarmonicVoltage(lineCountEvaluateParam, limitMap); - Map harmonicCurrentList = getHarmonicCurrent(lineCountEvaluateParam, limitMap); - Map frequencyDeviationList = getFrequencyDeviation(lineCountEvaluateParam, limitMap); - Map voltageDeviationList = getVoltageDeviation(lineCountEvaluateParam, limitMap); - Map threePhaseVoltageList = getThreePhaseVoltageUnbalance(lineCountEvaluateParam, limitMap); - Map negativeSequenceList = getNegativeSequenceCurrent(lineCountEvaluateParam, limitMap); - Map interHarmonicVoltageList = getInterharmonicVoltage(lineCountEvaluateParam, limitMap); - Map voltageFlickerList = getVoltageFlicker(lineCountEvaluateParam, limitMap); + List> pendingIds = ListUtils.partition(idList,NUM); + pendingIds.forEach(list->{ + lineCountEvaluateParam.setLineId(list); + Map harmonicVoltageList = getHarmonicVoltage(lineCountEvaluateParam, limitMap); + Map harmonicCurrentList = getHarmonicCurrent(lineCountEvaluateParam, limitMap); + Map frequencyDeviationList = getFrequencyDeviation(lineCountEvaluateParam, limitMap); + Map voltageDeviationList = getVoltageDeviation(lineCountEvaluateParam, limitMap); + Map threePhaseVoltageList = getThreePhaseVoltageUnbalance(lineCountEvaluateParam, limitMap); + Map negativeSequenceList = getNegativeSequenceCurrent(lineCountEvaluateParam, limitMap); + Map interHarmonicVoltageList = getInterharmonicVoltage(lineCountEvaluateParam, limitMap); + Map voltageFlickerList = getVoltageFlicker(lineCountEvaluateParam, limitMap); - pollutionList = processPollutionList(local, idList, dictData, harmonicVoltageList, harmonicCurrentList, frequencyDeviationList, voltageDeviationList, - threePhaseVoltageList, negativeSequenceList, interHarmonicVoltageList, voltageFlickerList); + List pollutionList = processPollutionList(local, idList, dictData, harmonicVoltageList, harmonicCurrentList, frequencyDeviationList, voltageDeviationList, + threePhaseVoltageList, negativeSequenceList, interHarmonicVoltageList, voltageFlickerList); + + //TODO 插入数据库 + if (CollUtil.isNotEmpty(pollutionList)) { + dataPollutionFeignClient.batchInsertion(pollutionList); + } + + }); - //TODO 插入数据库 - if (CollUtil.isNotEmpty(pollutionList)) { - dataPollutionFeignClient.batchInsertion(pollutionList); - } } @Override @@ -408,6 +420,7 @@ public class PollutionServiceImpl implements IPollutionService { */ private Map getHarmonicVoltage(LineCountEvaluateParam lineCountEvaluateParam, Map limitMap) { List list = new ArrayList<>(); + lineCountEvaluateParam.setColumnName(""); lineCountEvaluateParam.setValueType(Stream.of("CP95").collect(Collectors.toList())); lineCountEvaluateParam.setPhasicType(Stream.of("A", "B", "C").collect(Collectors.toList())); Map threePhase = getThreePhaseData(lineCountEvaluateParam, limitMap); @@ -508,6 +521,7 @@ public class PollutionServiceImpl implements IPollutionService { */ private Map getHarmonicCurrent(LineCountEvaluateParam lineCountEvaluateParam, Map limitMap) { Map map = new HashMap(); + lineCountEvaluateParam.setColumnName(""); lineCountEvaluateParam.setValueType(Stream.of("CP95").collect(Collectors.toList())); lineCountEvaluateParam.setPhasicType(Stream.of("A", "B", "C").collect(Collectors.toList())); List list = dataIFeignClient.getDataI(lineCountEvaluateParam).getData(); diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java index da01430..bb6a816 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java @@ -107,7 +107,7 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory> getDataV(@RequestBody LineCountEvaluateParam lineParam) { - log.error("{}异常,降级处理,异常为:{}","cvt数据插入DataV",cause.toString()); + log.error("{}异常,降级处理,异常为:{}","查询数据DataV",cause.toString()); throw new BusinessException(finalExceptionEnum); } }; diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java index ebd17fa..096bf39 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java @@ -157,9 +157,9 @@ public class DataVController extends BaseController { @OperateInfo(info = LogEnum.BUSINESS_COMMON) @PostMapping("/getDataV") @ApiOperation("获取原始数据") - public HttpResult> getDataV(LineCountEvaluateParam lineParam) { - String methodDescribe = getMethodDescribe("getMeasurementCount"); - List dataV = dataVQuery.getDataV(lineParam); + public HttpResult> getDataV(@RequestBody LineCountEvaluateParam lineParam) { + String methodDescribe = getMethodDescribe("getDataV"); + List dataV = dataVInsert.getDataV(lineParam); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataV, methodDescribe); }