From 7b65b796622684a2628b69c63948e657392cf058 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Fri, 7 Mar 2025 09:26:40 +0800 Subject: [PATCH] =?UTF-8?q?CVT=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/njcn/algorithm/pojo/bo/HourParam.java | 62 +++++++++++++++++++ .../com/njcn/algorithm/ExecutionCenter.java | 45 ++++++++++++++ .../executor/MeasurementExecutor.java | 16 +---- .../executor/MeasurementHourExecutor.java | 44 +++++++++++++ ...ice.java => IDataHarmRateVCvtService.java} | 4 +- .../serviceimpl/line/DataVCvtServiceImpl.java | 21 +++++-- .../api/DataHarmRateVFeignClient.java | 2 + ...taHarmRateVFeignClientFallbackFactory.java | 6 ++ .../controller/DataHarmRateVController.java | 9 +++ .../dataProcess/service/IDataHarmRateV.java | 2 + .../influxdb/InfluxdbDataHarmRateVImpl.java | 10 +++ .../relation/RelationDataHarmRateVImpl.java | 5 ++ 12 files changed, 204 insertions(+), 22 deletions(-) create mode 100644 algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/HourParam.java create mode 100644 algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementHourExecutor.java rename algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/{IDataVCvtService.java => IDataHarmRateVCvtService.java} (66%) diff --git a/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/HourParam.java b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/HourParam.java new file mode 100644 index 0000000..b64248f --- /dev/null +++ b/algorithm/algorithm-api/src/main/java/com/njcn/algorithm/pojo/bo/HourParam.java @@ -0,0 +1,62 @@ +package com.njcn.algorithm.pojo.bo; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Set; + +/** + * + * ** 按小时纬度执行算法参数 ** + * @author hongawen + * @version 1.0.0 + * @date 2023年11月01日 16:17 + */ +@Data +public class HourParam implements Serializable { + + /*** + * 是否全链路执行算法 + * 非全链路执行时,tag集合必须非空 + */ + @ApiModelProperty(name = "fullChain",value = "是否全链执行") + private boolean fullChain; + + /** + * 目前仅监测点日统计存在补招功能 by yxb + * 是否补招标识,默认不补招 + */ + @ApiModelProperty(name = "repair",value = "是否补招") + private boolean repair; + + + @ApiModelProperty(name = "beginTime",value = "补招起始日期_yyyy-MM-dd hh::mm::ss") + private String beginTime; + + + @ApiModelProperty(name = "endTime",value = "补招截止日期_yyyy-MM-dd hh::mm::ss") + private String endTime; + + + @ApiModelProperty(name = "dataDate",value = "时间日期_yyyy-MM-dd hh::mm::ss") + private String dataDate; + + + /*** + * 需要执行的组件 + * 当不需要全链路执行时,通过tag名称动态指定执行某个算法组件 + */ + @ApiModelProperty(name = "tagNames",value = "待执行链节点的tag集合") + private Set tagNames; + + + /** + * 待计算的对象索引集合,监测点、设备、母线、变电站、单位等等 + */ + @ApiModelProperty(name = "idList",value = "索引集合") + private List idList; + + +} diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java index 5f89acb..4811726 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java @@ -4,11 +4,13 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.text.StrPool; import cn.hutool.core.util.StrUtil; import com.njcn.algorithm.pojo.bo.BaseParam; import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.algorithm.pojo.bo.HourParam; import com.njcn.algorithm.pojo.enums.PrepareResponseEnum; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; @@ -147,6 +149,49 @@ public class ExecutionCenter extends BaseController { } } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("监测点算法执行链(按小时执行的算法)") + @PostMapping("/measurementPointExecutorByHour") + @Async("asyncExecutor") + public void measurementPointExecutorByHour(@RequestBody HourParam baseParam) { + String methodDescribe = getMethodDescribe("measurementPointExecutorByHour"); + //手动判断参数是否合法, + if (!baseParam.isFullChain() && CollectionUtil.isEmpty(baseParam.getTagNames())) { + throw new BusinessException(PrepareResponseEnum.NO_EXECUTOR_NODE); + } + + if (baseParam.isRepair() && StrUtil.isAllEmpty(baseParam.getBeginTime(), baseParam.getEndTime())) { + throw new BusinessException(PrepareResponseEnum.NO_REPAIR_DATE); + } + CalculatedParam calculatedParam = new CalculatedParam(); + BeanUtil.copyProperties(baseParam, calculatedParam); + // 测点索引 + if (CollectionUtils.isEmpty(calculatedParam.getIdList())) { + calculatedParam.setIdList(commTerminalGeneralClient.getRunMonitorIds().getData()); + } + LiteflowResponse liteflowResponse; + if (baseParam.isRepair()) { + //补招时,起始日期、截止日期必填 + DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATETIME_FORMAT); + DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATETIME_FORMAT); + long betweenHour = DateUtil.between(startDate, endDate, DateUnit.HOUR); + //递增日期执行算法链 + for (int i = 0; i < betweenHour; i++) { + if (i != 0) { + startDate = DateUtil.offsetHour(startDate, 1); + } + calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATETIME_FORMATTER)); + liteflowResponse = flowExecutor.execute2Resp("measurement_point_hour", calculatedParam); + dealResponse(calculatedParam, liteflowResponse, methodDescribe); + } + } else { + //非补招 + calculatedParam.setDataDate(DateUtil.format(DateUtil.offsetHour( DateUtil.parse(baseParam.getDataDate(), DatePattern.NORM_DATETIME_FORMAT),-1), DatePattern.NORM_DATETIME_FORMATTER)); + liteflowResponse = flowExecutor.execute2Resp("measurement_point_hour", calculatedParam); + dealResponse(calculatedParam, liteflowResponse, methodDescribe); + } + } + // @OperateInfo(info = LogEnum.BUSINESS_COMMON) // @ApiOperation("装置算法执行链") // @PostMapping("/deviceExecutor") diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java index 4b2fb53..ac59d5c 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java @@ -1,7 +1,6 @@ package com.njcn.algorithm.executor; import com.njcn.algorithm.service.line.IDataCleanService; -import com.njcn.algorithm.service.line.IDataVCvtService; import com.njcn.algorithm.service.line.IDayDataService; import com.yomahub.liteflow.annotation.LiteflowComponent; import com.yomahub.liteflow.annotation.LiteflowMethod; @@ -29,8 +28,7 @@ public class MeasurementExecutor extends BaseExecutor { @Resource private IDataCleanService dataCleanService; - @Resource - private IDataVCvtService dataVCvtService; + /** * 数据清洗 电压表 @@ -290,17 +288,5 @@ public class MeasurementExecutor extends BaseExecutor { } - /** - * 监测点cvt转换算法() - * @author hzj - */ - @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataVCvt", nodeType = NodeTypeEnum.COMMON) - public boolean dataVCvtAccess(NodeComponent bindCmp) { - return isAccess(bindCmp); - } - @LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataVCvt", nodeType = NodeTypeEnum.COMMON) - public void dataVCvtProcess(NodeComponent bindCmp) { - dataVCvtService.dataVCvtHandler(bindCmp.getRequestData()); - } } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementHourExecutor.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementHourExecutor.java new file mode 100644 index 0000000..a6f7ed8 --- /dev/null +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementHourExecutor.java @@ -0,0 +1,44 @@ +package com.njcn.algorithm.executor; + +import com.njcn.algorithm.service.line.IDataHarmRateVCvtService; +import com.yomahub.liteflow.annotation.LiteflowComponent; +import com.yomahub.liteflow.annotation.LiteflowMethod; +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.enums.LiteFlowMethodEnum; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Resource; + + +/** + * @author xy + * @version 1.0.0 + * @date 2025年1月16日 + */ +@Slf4j +@LiteflowComponent +@RequiredArgsConstructor +public class MeasurementHourExecutor extends BaseExecutor { + + + + @Resource + private IDataHarmRateVCvtService dataHarmRateVCvtService; + + + /** + * 监测点cvt转换算法() + * @author hzj + */ + @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataHarmRateVCvt", nodeType = NodeTypeEnum.COMMON) + public boolean dataVCvtAccess(NodeComponent bindCmp) { + return isAccess(bindCmp); + } + @LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataHarmRateVCvt", nodeType = NodeTypeEnum.COMMON) + public void dataVCvtProcess(NodeComponent bindCmp) { + dataHarmRateVCvtService.dataHarmRateVCvtHandler(bindCmp.getRequestData()); + } + +} diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataVCvtService.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataHarmRateVCvtService.java similarity index 66% rename from algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataVCvtService.java rename to algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataHarmRateVCvtService.java index b172ccf..7e02d6e 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataVCvtService.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataHarmRateVCvtService.java @@ -9,6 +9,6 @@ import com.njcn.algorithm.pojo.bo.CalculatedParam; * @author clam * @version V1.0.0 */ -public interface IDataVCvtService { - void dataVCvtHandler(CalculatedParam requestData); +public interface IDataHarmRateVCvtService { + void dataHarmRateVCvtHandler(CalculatedParam requestData); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataVCvtServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataVCvtServiceImpl.java index edaa73b..f024862 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataVCvtServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataVCvtServiceImpl.java @@ -1,11 +1,15 @@ package com.njcn.algorithm.serviceimpl.line; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; -import com.njcn.algorithm.service.line.IDataVCvtService; +import com.njcn.algorithm.service.line.IDataHarmRateVCvtService; import com.njcn.dataProcess.api.DataHarmRateVCvtFeignClient; import com.njcn.dataProcess.api.DataHarmRateVFeignClient; import com.njcn.dataProcess.constant.InfluxDBTableConstant; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.DataHarmDto; import com.njcn.dataProcess.pojo.dto.DataHarmRateVCvtDto; import com.njcn.dataProcess.pojo.dto.DataHarmRateVDto; import com.njcn.dataProcess.util.TimeUtils; @@ -34,7 +38,7 @@ import java.util.stream.Collectors; @Slf4j @Component @RequiredArgsConstructor -public class DataVCvtServiceImpl implements IDataVCvtService { +public class DataVCvtServiceImpl implements IDataHarmRateVCvtService { @Resource private CvtRelationFeignClient cvtRelationFeignClient; @Resource @@ -43,15 +47,22 @@ public class DataVCvtServiceImpl implements IDataVCvtService { private DataHarmRateVFeignClient dataHarmRateVFeignClient; @Override - public void dataVCvtHandler(CalculatedParam calculatedParam) { + public void dataHarmRateVCvtHandler(CalculatedParam calculatedParam) { LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); List idList = calculatedParam.getIdList(); LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam(); lineCountEvaluateParam.setLineId(calculatedParam.getIdList()); - lineCountEvaluateParam.setEndTime(calculatedParam.getDataDate()+ InfluxDBTableConstant.END_TIME); - lineCountEvaluateParam.setStartTime(calculatedParam.getDataDate()+ InfluxDBTableConstant.START_TIME); + DateTime beginTime1= DateUtil.beginOfHour(DateUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATETIME_FORMAT)); + //获取最新一条数据时间 + DataHarmDto topData = dataHarmRateVFeignClient.getTopData().getData(); + String minTime = topData.getMinTime(); + DateTime beginTime2= DateUtil.beginOfHour(DateUtil.parse(minTime, DatePattern.NORM_DATETIME_FORMAT)); + DateTime beginTime = beginTime1.isBefore(beginTime2) ? beginTime1 : beginTime2; + DateTime endTime = DateUtil.endOfHour(beginTime1); + lineCountEvaluateParam.setEndTime(DateUtil.format(endTime,DatePattern.NORM_DATETIME_FORMAT)); + lineCountEvaluateParam.setStartTime(DateUtil.format(beginTime,DatePattern.NORM_DATETIME_FORMAT)); List data = dataHarmRateVCvtFeignClient.getRawData(lineCountEvaluateParam).getData(); diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java index b8e784e..b2a1270 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataHarmRateVFeignClient.java @@ -27,5 +27,7 @@ public interface DataHarmRateVFeignClient { @PostMapping("/addList") HttpResult addList(@RequestBody List list); + @PostMapping("/getTopData") + HttpResult getTopData(); } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java index cd228d8..71a68f4 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataHarmRateVFeignClientFallbackFactory.java @@ -55,6 +55,12 @@ public class DataHarmRateVFeignClientFallbackFactory implements FallbackFactory< throw new BusinessException(finalExceptionEnum); } + @Override + public HttpResult getTopData() { + log.error("{}异常,降级处理,异常为:{}","获取最新原始数据",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java index 6d7e400..14a052c 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java @@ -55,6 +55,15 @@ public class DataHarmRateVController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.QUERY) + @PostMapping("/getTopData") + @ApiOperation("获取数据最新时间点") + public HttpResult getTopData() { + String methodDescribe = getMethodDescribe("getTopData"); + DataHarmDto data = dataHarmRateVQuery.getTopData(); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); + } + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) @PostMapping("/batchInsertion") @ApiOperation("批量插入") diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java index b3dcd04..0531b18 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java @@ -35,4 +35,6 @@ IDataHarmRateV extends IMppService { * 批量插入数据 */ void addList(List list); + + DataHarmDto getTopData(); } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java index 8a046f3..8233682 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java @@ -183,6 +183,16 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl dataHarmrateVList = dataHarmRateVMapper.selectByQueryWrapper(influxQueryWrapper); + dto.setMinTime(DATE_TIME_FORMATTER.format(dataHarmrateVList.get(0).getTime())); + return dto; + } + /** * 按监测点集合、时间条件获取分钟数据 * timeMap参数来判断是否进行数据处理 timeMap为空则不进行数据处理 diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java index 0e15d6d..6ed17a4 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java @@ -89,4 +89,9 @@ public class RelationDataHarmRateVImpl extends MppServiceImpl