From 243088415d33091367d810d3b1183d81842cf20b Mon Sep 17 00:00:00 2001 From: wr <1754607820@qq.com> Date: Wed, 20 Sep 2023 13:37:57 +0800 Subject: [PATCH] =?UTF-8?q?1.=E8=B0=90=E6=B3=A2=E6=B1=A1=E5=8C=BA=E5=9B=BE?= =?UTF-8?q?=E7=AE=97=E6=B3=95=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mysql/Impl/line/PollutionServiceImpl.java | 235 +++--------------- 1 file changed, 40 insertions(+), 195 deletions(-) diff --git a/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/Impl/line/PollutionServiceImpl.java b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/Impl/line/PollutionServiceImpl.java index 3b8f4e7c6..6ee485e2a 100644 --- a/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/Impl/line/PollutionServiceImpl.java +++ b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/Impl/line/PollutionServiceImpl.java @@ -14,13 +14,11 @@ import com.njcn.common.utils.LogUtil; import com.njcn.device.biz.commApi.CommTerminalGeneralClient; import com.njcn.device.biz.pojo.dto.DeptGetChildrenDTO; import com.njcn.device.biz.pojo.param.DeptGetLineParam; -import com.njcn.device.pq.api.DeptLineFeignClient; import com.njcn.device.pq.api.LineFeignClient; import com.njcn.device.biz.pojo.po.Overlimit; import com.njcn.harmonic.pojo.dto.PublicDTO; import com.njcn.harmonic.pojo.po.*; import com.njcn.harmonic.pojo.po.day.*; -import com.njcn.influxdb.utils.InfluxDbUtils; import com.njcn.prepare.harmonic.mapper.mysql.line.*; import com.njcn.prepare.harmonic.pojo.dto.PollutionDTO; import com.njcn.prepare.harmonic.pojo.param.LineParam; @@ -38,7 +36,6 @@ import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.math.RoundingMode; -import java.time.LocalDate; import java.time.LocalDateTime; import java.util.*; import java.util.function.BinaryOperator; @@ -58,12 +55,9 @@ import java.util.stream.Stream; @AllArgsConstructor public class PollutionServiceImpl implements PollutionService { - private final InfluxDbUtils influxDbUtils; private final LineFeignClient lineFeignClient; - private final DeptLineFeignClient deptLineFeignClient; - private final DicDataFeignClient dicDataFeignClient; private final RStatPollutionOrgDMapper rStatPollutionOrgDMapper; @@ -97,18 +91,13 @@ public class PollutionServiceImpl implements PollutionService { @Async("asyncExecutor") public void processPollutionData(LineParam lineParam) { - log.info("参数起始时间:"+lineParam.getBeginTime()); - log.info("参数结束时间:"+lineParam.getEndTime()); TimeInterval timer = new TimeInterval(); List pollutionList; LocalDateTime local = LocalDateTimeUtil.now(); if (StrUtil.isNotBlank(lineParam.getDataDate())){ - local = LocalDateTimeUtil.parse(lineParam.getBeginTime()); + local = DateUtil.parseLocalDateTime(lineParam.getBeginTime()); } - - - List dictData = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.POLLUTION_STATIS.getCode()).getData(); //时间类型为日的情况下执行influxdb计算 if (Integer.valueOf(BizParamConstant.STAT_BIZ_DAY).equals(lineParam.getType())){ @@ -123,14 +112,14 @@ public class PollutionServiceImpl implements PollutionService { } List lineList = overLimitList.stream().map(Overlimit::getId).collect(Collectors.toList()); //指标数据获取 - List harmonicVoltageList = getHarmonicVoltage(overLimitList,lineParam.getDataDate()); - List harmonicCurrentList = getHarmonicCurrent(overLimitList,lineParam.getDataDate()); - List frequencyDeviationList = getFrequencyDeviation(overLimitList,lineParam.getDataDate()); - List voltageDeviationList = getVoltageDeviation(overLimitList,lineParam.getDataDate()); - List threePhaseVoltageList = getThreePhaseVoltageUnbalance(overLimitList,lineParam.getDataDate()); - List negativeSequenceList = getNegativeSequenceCurrent(overLimitList,lineParam.getDataDate()); - List interHarmonicVoltageList = getInterharmonicVoltage(overLimitList,lineParam.getDataDate()); - List voltageFlickerList = getVoltageFlicker(overLimitList,lineParam.getDataDate()); + List harmonicVoltageList = getHarmonicVoltage(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); + List harmonicCurrentList = getHarmonicCurrent(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); + List frequencyDeviationList = getFrequencyDeviation(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); + List voltageDeviationList = getVoltageDeviation(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); + List threePhaseVoltageList = getThreePhaseVoltageUnbalance(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); + List negativeSequenceList = getNegativeSequenceCurrent(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); + List interHarmonicVoltageList = getInterharmonicVoltage(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); + List voltageFlickerList = getVoltageFlicker(overLimitList,lineParam.getBeginTime(),lineParam.getEndTime()); LogUtil.njcnDebug(log, "监测点污染指标数据查询耗时:{}", timer.intervalRestart()); List lineIdList = new ArrayList<>(); @@ -147,11 +136,10 @@ public class PollutionServiceImpl implements PollutionService { } - LocalDateTime localEnd = LocalDateTimeUtil.parse(lineParam.getEndTime()); + LocalDateTime localEnd = DateUtil.parseLocalDateTime(lineParam.getEndTime()); LambdaQueryWrapper lambdaQuery = new LambdaQueryWrapper<>(); - log.info("污区数据统计间隔"+local+"----->"+localEnd); List pollutionDayList =new ArrayList<>(); if(Integer.valueOf(BizParamConstant.STAT_BIZ_DAY).equals(lineParam.getType())){ lambdaQuery.ge(RMpPollutionDPO::getDataDate, local).le(RMpPollutionDPO::getDataDate,localEnd); @@ -399,10 +387,10 @@ public class PollutionServiceImpl implements PollutionService { /** * 谐波电压:取监测点最新的A、B、C三相数据,再取电压总谐波畸变率、各次谐波电压含有率(2~25次)中的最大值,作为结果 */ - private List getHarmonicVoltage(List overLimitList,String dataDate){ + private List getHarmonicVoltage(List overLimitList,String beginTime,String endTime){ List list = new ArrayList<>(); - Map> threePhase = getThreePhaseData(overLimitList,dataDate); - Map> distortionRate = getDistortionRateData(overLimitList,dataDate); + Map> threePhase = getThreePhaseData(overLimitList,beginTime,endTime); + Map> distortionRate = getDistortionRateData(overLimitList,beginTime,endTime); for (String key : threePhase.keySet()) { list.add(threePhase.get(key).get()); } @@ -414,13 +402,12 @@ public class PollutionServiceImpl implements PollutionService { return process(outMap); } - private Map> getThreePhaseData(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private Map> getThreePhaseData(List overLimitList,String beginTime,String endTime){ List threePhaseList = dataVDService.list(new LambdaQueryWrapper() .eq(RStatDataVDPO::getValueType, "CP95") .in(RStatDataVDPO::getPhaseType, Arrays.asList("A", "B", "C")) - .ge(RStatDataVDPO::getTime, stringStringMap.get("startTime")) - .le(RStatDataVDPO::getTime, stringStringMap.get("endTime")) + .ge(RStatDataVDPO::getTime, beginTime) + .le(RStatDataVDPO::getTime, endTime) ); List data; PublicDTO publicDTO; @@ -447,13 +434,12 @@ public class PollutionServiceImpl implements PollutionService { return lineData.stream().collect(Collectors.groupingBy(PublicDTO::getId,Collectors.reducing(BinaryOperator.maxBy(comparator)))); } - private Map> getDistortionRateData(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private Map> getDistortionRateData(List overLimitList,String beginTime,String endTime){ List distortionRateList = harmRateVDService.list(new LambdaQueryWrapper() .eq(RStatDataHarmrateVDPO::getValueType, "CP95") .in(RStatDataHarmrateVDPO::getPhaseType, Arrays.asList("A", "B", "C")) - .ge(RStatDataHarmrateVDPO::getTime, stringStringMap.get("startTime")) - .le(RStatDataHarmrateVDPO::getTime, stringStringMap.get("endTime")) + .ge(RStatDataHarmrateVDPO::getTime, beginTime) + .le(RStatDataHarmrateVDPO::getTime, endTime) ); List data; PublicDTO publicDTO; @@ -506,13 +492,12 @@ public class PollutionServiceImpl implements PollutionService { /** * 谐波电流:各次谐波电流(2~25次),取各监测点最新的A、B、C三相数据。 */ - private List getHarmonicCurrent(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private List getHarmonicCurrent(List overLimitList,String beginTime,String endTime){ List list = dataIDService.list(new LambdaQueryWrapper() .eq(RStatDataIDPO::getValueType, "CP95") .in(RStatDataIDPO::getPhaseType, Arrays.asList("A", "B", "C")) - .ge(RStatDataIDPO::getTime, stringStringMap.get("startTime")) - .le(RStatDataIDPO::getTime, stringStringMap.get("endTime")) + .ge(RStatDataIDPO::getTime, beginTime) + .le(RStatDataIDPO::getTime, endTime) ); List data; PublicDTO publicDTO; @@ -561,14 +546,13 @@ public class PollutionServiceImpl implements PollutionService { /** * 频率偏差:各监测点最新的T相数据,取频率上下偏差的绝对值中的最大值。 */ - private List getFrequencyDeviation(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private List getFrequencyDeviation(List overLimitList,String beginTime,String endTime){ List threePhaseList = dataVDService.list(new QueryWrapper() .select("line_id","abs(freq_dev) as freq_dev ") .in("value_type", Arrays.asList("MIN","MAX")) .in("phasic_type", Arrays.asList("T")) - .ge("time", stringStringMap.get("startTime")) - .le("time", stringStringMap.get("endTime")) + .ge("time", beginTime) + .le("time",endTime) ); List data; PublicDTO publicDTO; @@ -599,14 +583,13 @@ public class PollutionServiceImpl implements PollutionService { /** * 电压偏差:各监测点最新的A、B、C三相数据,取电压上下偏差的绝对值中的最大值。 */ - private List getVoltageDeviation(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private List getVoltageDeviation(List overLimitList,String beginTime,String endTime){ List list = dataVDService.list(new QueryWrapper() .select("line_id","vu_dev","vl_dev","value_type") .in("value_type", Arrays.asList("MIN","MAX")) .in("phasic_type", Arrays.asList("A","B","C")) - .ge("time", stringStringMap.get("startTime")) - .le("time", stringStringMap.get("endTime")) + .ge("time", beginTime) + .le("time", endTime) ); List data; PublicDTO publicDTO; @@ -638,14 +621,13 @@ public class PollutionServiceImpl implements PollutionService { /** * 三相电压不平衡度:各监测点最新的T相数据。 */ - private List getThreePhaseVoltageUnbalance(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private List getThreePhaseVoltageUnbalance(List overLimitList,String beginTime,String endTime){ List list = dataVDService.list(new QueryWrapper() .select("line_id","v_unbalance","value_type") .in("value_type", Arrays.asList("CP95","MAX")) .in("phasic_type", Arrays.asList("T")) - .ge("time", stringStringMap.get("startTime")) - .le("time", stringStringMap.get("endTime")) + .ge("time", beginTime) + .le("time", endTime) ); List data; PublicDTO publicDTO; @@ -676,14 +658,13 @@ public class PollutionServiceImpl implements PollutionService { /** * 负序电流:各监测点最新的T相数据。 */ - private List getNegativeSequenceCurrent(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private List getNegativeSequenceCurrent(List overLimitList,String beginTime,String endTime){ List list = dataIDService.list(new QueryWrapper() .select("line_id","i_neg","value_type") .in("value_type", Arrays.asList("CP95","MAX")) .in("phasic_type", Arrays.asList("T")) - .ge("time", stringStringMap.get("startTime")) - .le("time", stringStringMap.get("endTime")) + .ge("time", beginTime) + .le("time", endTime) ); List data; PublicDTO publicDTO; @@ -717,13 +698,12 @@ public class PollutionServiceImpl implements PollutionService { /** * 间谐波电压含有率:各监测点最新的A、B、C三相数据。 */ - private List getInterharmonicVoltage(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private List getInterharmonicVoltage(List overLimitList,String beginTime,String endTime){ List list = inharmVDService.list(new QueryWrapper() .in("value_type", Arrays.asList("CP95")) .in("phasic_type", Arrays.asList("A","B","C")) - .ge("time", stringStringMap.get("startTime")) - .le("time", stringStringMap.get("endTime")) + .ge("time", beginTime) + .le("time", endTime) ); List data; PublicDTO publicDTO; @@ -769,13 +749,12 @@ public class PollutionServiceImpl implements PollutionService { /** * 长时电压闪变:各监测点最新的A、B、C三相数据。 */ - private List getVoltageFlicker(List overLimitList,String dataDate){ - Map stringStringMap = processMysqlDate(dataDate, Integer.valueOf(BizParamConstant.STAT_BIZ_DAY)); + private List getVoltageFlicker(List overLimitList,String beginTime,String endTime){ List list = pltDService.list(new QueryWrapper() .in("value_type", Arrays.asList("CP95")) .in("phasic_type", Arrays.asList("A","B","C")) - .ge("time", stringStringMap.get("startTime")) - .le("time", stringStringMap.get("endTime")) + .ge("time", beginTime) + .le("time", endTime) ); List data; PublicDTO publicDTO; @@ -865,70 +844,6 @@ public class PollutionServiceImpl implements PollutionService { return outList; } - /** - * MYSQL 时间返回 - * @param date - * @param type - * @return - */ - private Map processMysqlDate(String date,Integer type){ - Map mapDate=new HashMap<>(); - if (StrUtil.isNotBlank(date)){ - if (Integer.valueOf(BizParamConstant.STAT_BIZ_DAY).equals(type)){ - mapDate.put("startTime",date+" 00:00:00"); - mapDate.put("endTime",date+" 23:59:59"); - }else if (Integer.valueOf(BizParamConstant.STAT_BIZ_MONTH).equals(type)){ - Date dateOut = DateUtil.parse(date); - Date dateOutb = DateUtil.beginOfMonth(dateOut); - Date dateOute = DateUtil.endOfMonth(dateOut); - mapDate.put("startTime",dateOutb+" 00:00:00"); - mapDate.put("endTime",dateOute+" 23:59:59"); - }else if (Integer.valueOf(BizParamConstant.STAT_BIZ_QUARTER).equals(type)){ - Date dateOut = DateUtil.parse(date); - Date dateOutb = DateUtil.beginOfQuarter(dateOut); - Date dateOute = DateUtil.endOfQuarter(dateOut); - mapDate.put("startTime",dateOutb+" 00:00:00"); - mapDate.put("endTime",dateOute+" 23:59:59"); - }else if (Integer.valueOf(BizParamConstant.STAT_BIZ_YEAR).equals(type)){ - Date dateOut = DateUtil.parse(date); - Date dateOutb = DateUtil.beginOfYear(dateOut); - Date dateOute = DateUtil.endOfYear(dateOut); - mapDate.put("startTime",dateOutb+" 00:00:00"); - mapDate.put("endTime",dateOute+" 23:59:59"); - } - } - return mapDate; - } - - - /** - * influxDb时间条件处理 - */ - private String processDate(String date,Integer type){ - String processSql = ""; - if (StrUtil.isNotBlank(date)){ - if (Integer.valueOf(BizParamConstant.STAT_BIZ_DAY).equals(type)){ - processSql = " and time >= '"+date+" 00:00:00' and time <= '"+date+" 23:59:59' "; - }else if (Integer.valueOf(BizParamConstant.STAT_BIZ_MONTH).equals(type)){ - Date dateOut = DateUtil.parse(date); - Date dateOutb = DateUtil.beginOfMonth(dateOut); - Date dateOute = DateUtil.endOfMonth(dateOut); - processSql =" and time >= '"+DateUtil.formatDate(dateOutb)+" 00:00:00' and time <= '"+DateUtil.formatDate(dateOute)+" 23:59:59' "; - }else if (Integer.valueOf(BizParamConstant.STAT_BIZ_QUARTER).equals(type)){ - Date dateOut = DateUtil.parse(date); - Date dateOutb = DateUtil.beginOfQuarter(dateOut); - Date dateOute = DateUtil.endOfQuarter(dateOut); - processSql =" and time >= '"+DateUtil.formatDate(dateOutb)+" 00:00:00' and time <= '"+DateUtil.formatDate(dateOute)+" 23:59:59' "; - }else if (Integer.valueOf(BizParamConstant.STAT_BIZ_YEAR).equals(type)){ - Date dateOut = DateUtil.parse(date); - Date dateOutb = DateUtil.beginOfYear(dateOut); - Date dateOute = DateUtil.endOfYear(dateOut); - processSql =" and time >= '"+DateUtil.formatDate(dateOutb)+" 00:00:00' and time <= '"+DateUtil.formatDate(dateOute)+" 23:59:59' "; - } - } - return processSql; - } - /** * 获取限值表中的所有监测点信息 */ @@ -936,74 +851,4 @@ public class PollutionServiceImpl implements PollutionService { return lineFeignClient.getAllLineOverLimit("harmonic-boot","").getData(); } - /*private List getDataPolluction(LineParam lineParam){ - List lineList; - if (CollUtil.isEmpty(lineParam.getLineIds())){ - List overLimitList = getAllLinesLimitData(); - lineList = overLimitList.stream().map(Overlimit::getId).collect(Collectors.toList()); - }else { - lineList = new ArrayList<>(lineParam.getLineIds()); - } - List pollutionDTOList = new ArrayList<>(); - InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); - for (String lineId : lineList){ - String sql="SELECT * FROM harmonic_pollution where line_id = '" + lineId +"' "+processDate(lineParam.getDataDate(),lineParam.getType()); - QueryResult dataPolluctionResult = influxDbUtils.query(sql); - List threePhaseList = resultMapper.toPOJO(dataPolluctionResult, DataPolluctionPO.class); - for (DataPolluctionPO dataPolluction : threePhaseList){ - PollutionDTO pollutionDTO = new PollutionDTO(); - pollutionDTO.setLineId(dataPolluction.getLineId()); - pollutionDTO.setFreqDev(dataPolluction.getFreqDev()); - pollutionDTO.setVDev(dataPolluction.getVDev()); - pollutionDTO.setVUnbalance(dataPolluction.getVUnbalance()); - pollutionDTO.setVAll(dataPolluction.getVAll()); - pollutionDTO.setPlt(dataPolluction.getPlt()); - pollutionDTO.setIAll(dataPolluction.getIAll()); - pollutionDTO.setINeg(dataPolluction.getINeg()); - pollutionDTO.setVInharm(dataPolluction.getVInharm()); - pollutionDTOList.add(pollutionDTO); - } - } - return pollutionDTOList; - }*/ - - /*MySql入表 r_mp_pollution_d*/ - /*private void insertLinePollution(List pollutionList, LocalDateTime local){ - for (PollutionDTO pollution : pollutionList){ - RMpPollutionD rMpPollution = new RMpPollutionD(); - rMpPollution.setLineId(pollution.getLineId()); - rMpPollution.setFreqDev(pollution.getFreqDev().floatValue()); - rMpPollution.setvDev(pollution.getVDev().floatValue()); - rMpPollution.setvUnbalance(pollution.getVUnbalance().floatValue()); - rMpPollution.setvAll(pollution.getVAll().floatValue()); - rMpPollution.setPlt(pollution.getPlt().floatValue()); - rMpPollution.setiAll(pollution.getIAll().floatValue()); - rMpPollution.setiNeg(pollution.getINeg().floatValue()); - rMpPollution.setvInharm(pollution.getVInharm().floatValue()); - rMpPollution.setDataDate(local); - rMpPollutionDMapper.insertPollution(rMpPollution); - } - }*/ - - /*private void insertPolluction(List list, long time){ - List records = new ArrayList(); - list.forEach(item->{ - Map tags = new HashMap<>(); - Map fields = new HashMap<>(); - tags.put("line_id",item.getLineId()); - fields.put("freq_dev",item.getFreqDev()); - fields.put("v_dev",item.getVDev()); - fields.put("v_unbalance",item.getVUnbalance()); - fields.put("i_neg",item.getINeg()); - fields.put("v_all",item.getVAll()); - fields.put("i_all",item.getIAll()); - fields.put("v_inharm",item.getVInharm()); - fields.put("plt",item.getPlt()); - Point point = influxDbUtils.pointBuilder("harmonic_pollution", time, TimeUnit.MILLISECONDS,tags, fields); - BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).tag("line_id", item.getLineId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); - batchPoints.point(point); - records.add(batchPoints.lineProtocol()); - }); - influxDbUtils.batchInsert(influxDbUtils.getDbName(),"", InfluxDB.ConsistencyLevel.ALL, records); - }*/ }