From 36c5de878afc6217254dac53a87ca5d166abaa10 Mon Sep 17 00:00:00 2001 From: huangzj <826100833@qq.com> Date: Thu, 14 Sep 2023 13:57:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8C=87=E6=A0=87=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../param/FrequencyStatisticalQueryParam.java | 41 ++++++ .../controller/StableDataController.java | 12 ++ .../handler/MqttMessageHandler.java | 134 ++++++++++-------- .../csharmonic/service/StableDataService.java | 3 + .../service/impl/StableDataServiceImpl.java | 89 +++++++++++- 5 files changed, 215 insertions(+), 64 deletions(-) create mode 100644 cs-harmonic/cs-harmonic-api/src/main/java/com/njcn/csharmonic/param/FrequencyStatisticalQueryParam.java diff --git a/cs-harmonic/cs-harmonic-api/src/main/java/com/njcn/csharmonic/param/FrequencyStatisticalQueryParam.java b/cs-harmonic/cs-harmonic-api/src/main/java/com/njcn/csharmonic/param/FrequencyStatisticalQueryParam.java new file mode 100644 index 0000000..99ccf27 --- /dev/null +++ b/cs-harmonic/cs-harmonic-api/src/main/java/com/njcn/csharmonic/param/FrequencyStatisticalQueryParam.java @@ -0,0 +1,41 @@ +package com.njcn.csharmonic.param; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import java.util.List; + +/** + * Description: + * Date: 2023/6/14 15:09【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class FrequencyStatisticalQueryParam { + @ApiModelProperty(value = "监测点集") + private List lineList; + @ApiModelProperty(value = "设备id") + private String DevId; + @ApiModelProperty(value = "指标id") + @NotBlank(message="指标id不能为空") + private String statisticalId; + @ApiModelProperty(value = "取值类型(Max,Min,cp95,avg)") + private String valueType; + @ApiModelProperty(value = "频次开始") + private Integer frequencyStart; + @ApiModelProperty(value = "频次结束") + private Integer frequencyEnd; + + @ApiModelProperty(value = "开始时间") + private String startTime; + + @ApiModelProperty(value = "结束时间") + private String endTime; + + + + +} diff --git a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/controller/StableDataController.java b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/controller/StableDataController.java index c6beb70..386b663 100644 --- a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/controller/StableDataController.java +++ b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/controller/StableDataController.java @@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; import com.njcn.csharmonic.param.CommonStatisticalQueryParam; +import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam; import com.njcn.csharmonic.param.ThdDataQueryParm; import com.njcn.csharmonic.pojo.vo.ThdDataVO; import com.njcn.csharmonic.service.StableDataService; @@ -112,4 +113,15 @@ public class StableDataController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/QuerySqlData") + @ApiOperation("查询(2-50)的指标") + @ApiImplicitParam(name = "frequencyStatisticalQueryParam", value = "查询参数", required = true) + public HttpResult> QuerySqlData(@RequestBody FrequencyStatisticalQueryParam frequencyStatisticalQueryParam) { + String methodDescribe = getMethodDescribe("queryFisrtCommonStatistical"); + List thdDataVOList = stableDataService.QuerySqlData(frequencyStatisticalQueryParam); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, thdDataVOList, methodDescribe); + } + + } \ No newline at end of file diff --git a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java index 218307e..ac34800 100644 --- a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java +++ b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java @@ -7,6 +7,7 @@ import com.github.tocrhz.mqtt.annotation.NamedValue; import com.github.tocrhz.mqtt.annotation.Payload; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.csharmonic.param.CommonStatisticalQueryParam; +import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam; import com.njcn.csharmonic.pojo.vo.CsRtDataVO; import com.njcn.csharmonic.pojo.vo.ThdDataVO; import com.njcn.csharmonic.service.ILineTargetService; @@ -71,57 +72,65 @@ public class MqttMessageHandler { Gson gson = new Gson(); publisher.send("/zl/TemperData/"+devId,gson.toJson(statisticalDataDTOS),1,false); } - - /** - * 实时数据应答 + /* + * 谐波数据 */ + +// @MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1) +// public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) { +// +// } + + + /** + * 实时数据应答 + */ @MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1) public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) { String topoDataJson =redisUtil.getStringByKey (devId); if(StringUtils.isEmpty(topoDataJson)){ List result = new ArrayList<>(); List tempList = new ArrayList<>(); - ExecutorService executorService = new ThreadPoolExecutor(10, 100, - 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100, true), - Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); - List>> resultList = new ArrayList< Future>>(); +// ExecutorService executorService = new ThreadPoolExecutor(10, Integer.MAX_VALUE, +// 1, TimeUnit.MINUTES, new LinkedBlockingQueue(), +// Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); +// List>> resultList = new ArrayList< Future>>(); //1.查询拓扑图配置的指标:拓扑图扑图配置:7677f94c749dedaff30f911949cbd724 List data = csStatisticalSetFeignClient.queryStatisticalSelect("7677f94c749dedaff30f911949cbd724").getData(); data.forEach(temp->{ if(Objects.nonNull(temp.getHarmStart())&&Objects.nonNull(temp.getHarmEnd())){ - for (int i = temp.getHarmStart(); i < temp.getHarmEnd()+1; i++) { - CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam(); - commonStatisticalQueryParam.setDevId(devId); - commonStatisticalQueryParam.setStatisticalId(temp.getId()); - commonStatisticalQueryParam.setValueType("avg"); - commonStatisticalQueryParam.setFrequency(i+""); - Future> listFuture= executorService.submit(new TaskWithResult(commonStatisticalQueryParam)); - resultList.add(listFuture); - } + FrequencyStatisticalQueryParam frequencyStatisticalQueryParam = new FrequencyStatisticalQueryParam(); + frequencyStatisticalQueryParam.setDevId(devId); + frequencyStatisticalQueryParam.setStatisticalId(temp.getId()); + frequencyStatisticalQueryParam.setValueType("avg"); + frequencyStatisticalQueryParam.setFrequencyStart(temp.getHarmStart()); + frequencyStatisticalQueryParam.setFrequencyEnd(temp.getHarmEnd()); + List thdDataVOList = stableDataService.QuerySqlData(frequencyStatisticalQueryParam); + tempList.addAll(thdDataVOList); }else { CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam(); commonStatisticalQueryParam.setDevId(devId); commonStatisticalQueryParam.setStatisticalId(temp.getId()); commonStatisticalQueryParam.setValueType("avg"); - Future> listFuture= executorService.submit(new TaskWithResult(commonStatisticalQueryParam)); - resultList.add(listFuture); + List listFuture= stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam); + tempList.addAll(listFuture); } }); - executorService.shutdown(); - - resultList.forEach(temp->{ - try { - tempList.addAll(temp.get()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - }); +// executorService.shutdown(); +// +// resultList.forEach(temp->{ +// try { +// tempList.addAll(temp.get()); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } catch (ExecutionException e) { +// throw new RuntimeException(e); +// } +// }); //过滤M相 List m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList()); m.stream().forEach(temp->{ @@ -147,19 +156,19 @@ public class MqttMessageHandler { } }); - List thdV = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdI(%)")).collect(Collectors.toList()); - Map> collect1 = thdV.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); - collect1.forEach((k,v)->{ - if(!CollectionUtil.isEmpty(v)){ - double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); - ThdDataVO thdDataVO = new ThdDataVO(); - BeanUtils.copyProperties(v.get(0),thdDataVO); - thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); - thdDataVO.setPhase("avg"); - result.add(thdDataVO); - - } - }); +// List thdV = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdI(%)")).collect(Collectors.toList()); +// Map> collect1 = thdV.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); +// collect1.forEach((k,v)->{ +// if(!CollectionUtil.isEmpty(v)){ +// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); +// ThdDataVO thdDataVO = new ThdDataVO(); +// BeanUtils.copyProperties(v.get(0),thdDataVO); +// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); +// thdDataVO.setPhase("avg"); +// result.add(thdDataVO); +// +// } +// }); List apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList()); Map> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); collect3.forEach((k,v)->{ @@ -225,25 +234,24 @@ public class MqttMessageHandler { return ""; } - class TaskWithResult implements Callable> { - private CommonStatisticalQueryParam commonStatisticalQueryParam; - - public TaskWithResult(CommonStatisticalQueryParam commonStatisticalQueryParam) { - this.commonStatisticalQueryParam = commonStatisticalQueryParam; - } - - /** - * 任务的具体过程,一旦任务传给ExecutorService的submit方法,则该方法自动在一个线程上执行。 - * - * @return - * @throws Exception - */ - @Override - public List call() throws Exception { - List thdDataVOList = new ArrayList<>(); - thdDataVOList = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam); - - return thdDataVOList; - } - } +// class TaskWithResult implements Callable> { +// private CommonStatisticalQueryParam commonStatisticalQueryParam; +// +// public TaskWithResult(CommonStatisticalQueryParam commonStatisticalQueryParam) { +// this.commonStatisticalQueryParam = commonStatisticalQueryParam; +// } +// +// /** +// * 任务的具体过程,一旦任务传给ExecutorService的submit方法,则该方法自动在一个线程上执行。 +// * +// * @return +// * @throws Exception +// */ +// @Override +// public List call() throws Exception { +// List thdDataVOList = new ArrayList<>(); +// thdDataVOList = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam); +// return thdDataVOList; +// } +// } } diff --git a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/StableDataService.java b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/StableDataService.java index 2449208..d9aaf72 100644 --- a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/StableDataService.java +++ b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/StableDataService.java @@ -2,6 +2,7 @@ package com.njcn.csharmonic.service; import com.njcn.csharmonic.param.CommonStatisticalQueryParam; +import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam; import com.njcn.csharmonic.param.ThdDataQueryParm; import com.njcn.csharmonic.pojo.vo.ThdDataVO; @@ -34,4 +35,6 @@ public interface StableDataService { List queryCommonStatisticalByTime(CommonStatisticalQueryParam commonStatisticalQueryParam); List queryLineCommonStatistical(CommonStatisticalQueryParam commonStatisticalQueryParam); + + public List QuerySqlData(FrequencyStatisticalQueryParam frequencyStatisticalQueryParam); } diff --git a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/impl/StableDataServiceImpl.java b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/impl/StableDataServiceImpl.java index d6e16b1..cb18228 100644 --- a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/impl/StableDataServiceImpl.java +++ b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/service/impl/StableDataServiceImpl.java @@ -8,6 +8,7 @@ import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.csdevice.utils.ReflectUtils; import com.njcn.csharmonic.param.CommonStatisticalQueryParam; +import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam; import com.njcn.csharmonic.param.ThdDataQueryParm; import com.njcn.csharmonic.pojo.vo.ThdDataVO; import com.njcn.csharmonic.service.StableDataService; @@ -17,9 +18,14 @@ import com.njcn.influx.pojo.po.PowerQualityData; import com.njcn.influx.service.CommonService; import com.njcn.influx.service.HaronicRatioService; import com.njcn.influx.service.PowerQualityService; +import com.njcn.influxdb.param.InfluxDBPublicParam; +import com.njcn.influxdb.utils.InfluxDbUtils; +import com.njcn.system.api.DicDataFeignClient; import com.njcn.system.api.EpdFeignClient; +import com.njcn.system.enums.DicDataEnum; import com.njcn.system.pojo.po.EleEpdPqd; import lombok.RequiredArgsConstructor; +import org.influxdb.dto.QueryResult; import org.springframework.beans.BeanUtils; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -52,6 +58,9 @@ public class StableDataServiceImpl implements StableDataService { private final EpdFeignClient epdFeignClient; private final CommonService commonService; private final DecimalFormat df = new DecimalFormat("#0.0000"); + private final DicDataFeignClient dicDataFeignClient; + private final InfluxDbUtils influxDbUtils; + @Override public List queryThdData(String devId, String statisticalName) { @@ -105,6 +114,8 @@ public class StableDataServiceImpl implements StableDataService { return thdDataVOList; } + + @Override public List queryFisrtThdContent(String devId, String statisticalName) { List thdDataVOList = new ArrayList<>(); @@ -170,8 +181,10 @@ public class StableDataServiceImpl implements StableDataService { csLinePOList = csLineFeignClient.queryLineByDevId(commonStatisticalQueryParam.getDevId()).getData(); } + String areaId = dicDataFeignClient.getDicDataByCode(DicDataEnum.OUTPUT_SIDE.getCode()).getData().getId(); + Optional.ofNullable(csLinePOList).orElseThrow(()-> new BusinessException(AlgorithmResponseEnum.LINE_DATA_ERROR)); - collect = csLinePOList.stream().map(CsLinePO::getLineId).collect(Collectors.toList()); + collect = csLinePOList.stream().filter(temp->Objects.equals(areaId,temp.getPosition())).map(CsLinePO::getLineId).collect(Collectors.toList()); @@ -208,6 +221,7 @@ public class StableDataServiceImpl implements StableDataService { return collect1; } + @Override public List queryCommonStatisticalByTime(CommonStatisticalQueryParam commonStatisticalQueryParam) { Optional.ofNullable(commonStatisticalQueryParam.getDevId()).orElseThrow(()-> new BusinessException(AlgorithmResponseEnum.DEVICE_LOSE)); @@ -307,4 +321,77 @@ public class StableDataServiceImpl implements StableDataService { } } + @Override + public List QuerySqlData(FrequencyStatisticalQueryParam frequencyStatisticalQueryParam) { + List collect = new ArrayList<>(); + List csLinePOList = new ArrayList<>(); + List thdDataVOList = new ArrayList<>(); + if(Objects.isNull(frequencyStatisticalQueryParam.getDevId())){ + if(!CollectionUtil.isEmpty(frequencyStatisticalQueryParam.getLineList())){ + collect =frequencyStatisticalQueryParam.getLineList(); + csLinePOList = csLineFeignClient.queryLineById(collect).getData(); + }else { + throw new BusinessException(AlgorithmResponseEnum.LINE_DATA_ERROR); + } + }else { + csLinePOList = csLineFeignClient.queryLineByDevId(frequencyStatisticalQueryParam.getDevId()).getData(); + + } + String areaId = dicDataFeignClient.getDicDataByCode(DicDataEnum.OUTPUT_SIDE.getCode()).getData().getId(); + + Optional.ofNullable(csLinePOList).orElseThrow(()-> new BusinessException(AlgorithmResponseEnum.LINE_DATA_ERROR)); + List collect1 = csLinePOList.stream().filter(temp -> Objects.equals(areaId, temp.getPosition())).collect(Collectors.toList()); + /*治理侧监测点*/ + CsLinePO csLinePO = collect1.get(0); + + EleEpdPqd data = epdFeignClient.selectById(frequencyStatisticalQueryParam.getStatisticalId()).getData(); + Optional.ofNullable(data).orElseThrow(()-> new BusinessException(AlgorithmResponseEnum.ELEEPDPQD_DATA_ERROR)); + + StringBuilder stringBuilder1 = new StringBuilder(); + StringBuilder stringBuilder2 = new StringBuilder(); + + for (int i = frequencyStatisticalQueryParam.getFrequencyStart(); i <=frequencyStatisticalQueryParam.getFrequencyEnd() ; i++) { + if (i==frequencyStatisticalQueryParam.getFrequencyEnd()){ + stringBuilder1.append("last("+data.getName()).append("_"+i).append(") AS "+data.getName()).append("_"+i); + } else { + stringBuilder1.append("last("+data.getName()).append("_"+i).append(") AS "+data.getName()).append("_"+i).append(","); + } + } + stringBuilder2.append ("line_id='").append (csLinePO.getLineId()).append("' and value_type = 'avg' group by phasic_type ").append(InfluxDBPublicParam.TIME_ZONE); + String sql1 = "select "+stringBuilder1+" from "+"apf_data"+" where "+stringBuilder2; + + QueryResult sqlData = influxDbUtils.query(sql1); + List queryResults = sqlData.getResults(); + if (CollectionUtil.isNotEmpty(queryResults)) { + for (QueryResult.Result result : queryResults) { + if (CollectionUtil.isNotEmpty(result.getSeries())) { + List seriesList = result.getSeries(); + if (CollectionUtil.isNotEmpty(seriesList)) { + //如果查询语句中存在group by,会存在多个series + for (QueryResult.Series series : seriesList) { + String key = data.getName(); + for (int i = frequencyStatisticalQueryParam.getFrequencyStart(); i <=frequencyStatisticalQueryParam.getFrequencyEnd() ; i++) { + int i1 = i - frequencyStatisticalQueryParam.getFrequencyStart()+1; + Double value = Double.parseDouble(series.getValues().get(0).get(i1).toString()); + ThdDataVO thdDataVO = new ThdDataVO(); + thdDataVO.setLineId(csLinePO.getLineId()); + thdDataVO.setLineName(csLinePO.getName()); + thdDataVO.setPhase(series.getTags().get("phasic_type")); + thdDataVO.setStatMethod("avg"); + thdDataVO.setStatisticalData(Double.valueOf(df.format(value))); + thdDataVO.setStatisticalIndex(data.getId()); + thdDataVO.setStatisticalName(data.getName()+ i +"("+data.getUnit()+")"); + thdDataVO.setAnotherName(data.getShowName()); + thdDataVO.setUnit(data.getUnit()); + thdDataVOList.add(thdDataVO); + + } + } + } + } + } + } + return thdDataVOList; + } + }