From 8ce1f83531aad21c59128070982b5bd1bed8becd Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Tue, 14 Oct 2025 15:28:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=95=B8=E5=8F=98=E7=8E=87?= =?UTF-8?q?=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/MeasurementExecutor.java | 23 +++++++ data-processing/data-processing-api/pom.xml | 6 ++ .../dataProcess/api/RmpVThdFeignClient.java | 31 ++++++++++ .../RMpVThdFeignClientFallbackFactory.java | 58 +++++++++++++++++ .../njcn/dataProcess/pojo/dto/RMpVThd.java | 62 +++++++++++++++++++ .../controller/RMpVThdController.java | 60 ++++++++++++++++++ .../dao/relation/mapper/RMpVThdMapper.java | 28 +++++++++ .../dao/relation/mapper/RMpVThdMapper.xml | 31 ++++++++++ .../dataProcess/service/IRMpVThdService.java | 18 ++++++ .../relation/RelationRMpVThdServiceImpl.java | 53 ++++++++++++++++ .../message/messagedto/DevComFlagDTO.java | 2 +- .../message/consumer/TopicLogsConsumer.java | 2 +- 12 files changed, 372 insertions(+), 2 deletions(-) create mode 100644 data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/RmpVThdFeignClient.java create mode 100644 data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/RMpVThdFeignClientFallbackFactory.java create mode 100644 data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/RMpVThd.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/RMpVThdController.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.xml create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IRMpVThdService.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationRMpVThdServiceImpl.java diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java index 7c4c159..6937f3b 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java @@ -2,6 +2,8 @@ package com.njcn.algorithm.executor; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.service.line.*; +import com.njcn.dataProcess.api.RmpVThdFeignClient; +import com.njcn.dataProcess.pojo.dto.RMpVThd; import com.yomahub.liteflow.annotation.LiteflowComponent; import com.yomahub.liteflow.annotation.LiteflowMethod; import com.yomahub.liteflow.core.NodeComponent; @@ -11,6 +13,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; +import java.util.List; /** @@ -39,6 +42,8 @@ public class MeasurementExecutor extends BaseExecutor { private IPollutionService pollutionService; @Resource private IPollutionCalc pollutionCalc; + @Resource + private RmpVThdFeignClient rmpVThdFeignClient; /** * 数据质量清洗 */ @@ -481,6 +486,24 @@ public class MeasurementExecutor extends BaseExecutor { dataIntegrityService.dataIntegrity(bindCmp.getRequestData()); } + /** + * 算法名: 3.4.1.6.1-----监测点谐波畸变率_日表(r_mp_v_thd) + * + * @author xuyang + * @date 2023年11月13日 19:34 + */ + @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "rMpVThd", nodeType = NodeTypeEnum.COMMON) + public boolean rMpVThdAccess(NodeComponent bindCmp) { + return isAccess(bindCmp); + } + @LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "rMpVThd", nodeType = NodeTypeEnum.COMMON) + public void rMpVThdProcess(NodeComponent bindCmp) { + List data = rmpVThdFeignClient.queryThd(bindCmp.getRequestData()).getData(); + rmpVThdFeignClient.batchInsertionThd(data); + + } + + } diff --git a/data-processing/data-processing-api/pom.xml b/data-processing/data-processing-api/pom.xml index e22d639..ebaf864 100644 --- a/data-processing/data-processing-api/pom.xml +++ b/data-processing/data-processing-api/pom.xml @@ -59,6 +59,12 @@ 1.0.0 compile + + com.njcn.platform + algorithm-api + 1.0.0 + compile + diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/RmpVThdFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/RmpVThdFeignClient.java new file mode 100644 index 0000000..7a442dc --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/RmpVThdFeignClient.java @@ -0,0 +1,31 @@ +package com.njcn.dataProcess.api; + +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.dataProcess.api.fallback.RMpVThdFeignClientFallbackFactory; +import com.njcn.dataProcess.api.fallback.SpThroughFeignClientFallbackFactory; +import com.njcn.dataProcess.pojo.dto.RActivePowerRangeDto; +import com.njcn.dataProcess.pojo.dto.RMpVThd; +import com.njcn.dataProcess.pojo.dto.SpThroughDto; +import io.swagger.annotations.ApiOperation; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +import java.util.List; + +/** + * @author denghuajun + * @version 1.0.0 + * @date 2022年01月05日 15:11 + */ +@FeignClient(value = ServerInfo.PLATFORM_DATA_PROCESSING_BOOT, path = "/rmpvthd", fallbackFactory = RMpVThdFeignClientFallbackFactory.class, contextId = "rmpvthd") +public interface RmpVThdFeignClient { + + @PostMapping("/batchInsertionThd") + HttpResult batchInsertionThd(@RequestBody List result); + + @PostMapping("/batchInsertionPower") + HttpResult> queryThd(@RequestBody CalculatedParam calculatedParam); +} diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/RMpVThdFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/RMpVThdFeignClientFallbackFactory.java new file mode 100644 index 0000000..8f47a54 --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/RMpVThdFeignClientFallbackFactory.java @@ -0,0 +1,58 @@ +package com.njcn.dataProcess.api.fallback; + +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.dataProcess.api.RmpVThdFeignClient; +import com.njcn.dataProcess.api.SpThroughFeignClient; +import com.njcn.dataProcess.pojo.dto.RActivePowerRangeDto; +import com.njcn.dataProcess.pojo.dto.RMpVThd; +import com.njcn.dataProcess.pojo.dto.SpThroughDto; +import com.njcn.dataProcess.util.DataProcessingEnumUtil; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author denghuajun + * @version 1.0.0 + * @date 2022年01月05日 15:08 + */ +@Slf4j +@Component +public class RMpVThdFeignClientFallbackFactory implements FallbackFactory { + + + /** + * 输出远程请求接口异常日志 + * @param cause RPC请求异常 + */ + @Override + public RmpVThdFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if(cause.getCause() instanceof BusinessException){ + BusinessException businessException = (BusinessException) cause.getCause(); + exceptionEnum = DataProcessingEnumUtil.getExceptionEnum(businessException.getResult()); + } + Enum finalExceptionEnum = exceptionEnum; + return new RmpVThdFeignClient() { + + + @Override + public HttpResult batchInsertionThd(List result) { + log.error("{}异常,降级处理,异常为:{}","畸变率插入",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + + @Override + public HttpResult> queryThd(CalculatedParam calculatedParam) { + log.error("{}异常,降级处理,异常为:{}","畸变率查询",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/RMpVThd.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/RMpVThd.java new file mode 100644 index 0000000..a3d12d0 --- /dev/null +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/RMpVThd.java @@ -0,0 +1,62 @@ +package com.njcn.dataProcess.pojo.dto; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import com.github.jeffreyning.mybatisplus.anno.MppMultiId; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDate; + +/** + * + * Description: + * 接口文档访问地址:http://serverIP:port/swagger-ui.html + * Date: 2022/10/10 19:59【需求编号】 + * + * @author clam + * @version V1.0.0 + */ + +/** + * 谐波畸变率排名 + */ +@ApiModel(value="com-njcn-harmonic-pojo-po-RMpVThd") +@Data +@AllArgsConstructor +@NoArgsConstructor +@TableName(value = "r_mp_v_thd") +public class RMpVThd implements Serializable { + /** + * 监测点ID + */ + @TableField(value = "measurement_point_id") + @MppMultiId + private String measurementPointId; + + /** + * 排名类型,字典表(1年 2季度 3月份 4周 5日) + */ + @TableField(value = "data_type") + @MppMultiId + private String dataType; + + /** + * 时间 + */ + @TableField(value = "data_date") + @MppMultiId + private LocalDate dataDate; + + /** + * 谐波畸变率 + */ + @TableField(value = "v_thd") + private Double vThd; + + private static final long serialVersionUID = 1L; +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/RMpVThdController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/RMpVThdController.java new file mode 100644 index 0000000..800df90 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/RMpVThdController.java @@ -0,0 +1,60 @@ +package com.njcn.dataProcess.controller; + +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.constant.OperateType; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.dataProcess.annotation.InsertBean; +import com.njcn.dataProcess.pojo.dto.RMpVThd; +import com.njcn.dataProcess.service.IRMpVThdService; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Controller; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * @author hongawen + * @version 1.0 + * @data 2024/11/6 19:48 + */ +@Validated +@Slf4j +@Controller +@RestController +@RequestMapping("/rmpvthd") +@Api(tags = "畸变率") +public class RMpVThdController extends BaseController { + + @InsertBean + private IRMpVThdService irMpVThdService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) + @PostMapping("/batchInsertionThd") + @ApiOperation("畸变率批量插入") + public HttpResult batchInsertionThd(@RequestBody List result) { + String methodDescribe = getMethodDescribe("batchInsertionPower"); + irMpVThdService.batchInsertionThd(result); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) + @PostMapping("/batchInsertionPower") + @ApiOperation("畸变率查询") + public HttpResult> queryThd(@RequestBody CalculatedParam calculatedParam) { + String methodDescribe = getMethodDescribe("batchInsertionPower"); + List rMpVThdList = irMpVThdService.queryHarmonicVThd(calculatedParam); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, rMpVThdList, methodDescribe); + } + +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.java new file mode 100644 index 0000000..ba831ae --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.java @@ -0,0 +1,28 @@ +package com.njcn.dataProcess.dao.relation.mapper; + + +import com.github.jeffreyning.mybatisplus.base.MppBaseMapper; +import com.njcn.dataProcess.pojo.dto.RMpVThd; +import org.apache.ibatis.annotations.Param; + +import java.util.List; +import java.util.Map; + +/** + *

+ * 谐波畸变率排名 Mapper 接口 + *

+ * + * @author xiaoyao + * @since 2022-11-07 + */ +public interface RMpVThdMapper extends MppBaseMapper { + + int insertRate(@Param("item") Map item); + + /** + * 从r_stat_data_v_d中获取畸变率的最大值 + * phasic_type = A、B、C && value_type = CP95 + */ + List getVThdData(@Param("time") String time, @Param("lineList")List lineList); +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.xml b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.xml new file mode 100644 index 0000000..ebeb958 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/dao/relation/mapper/RMpVThdMapper.xml @@ -0,0 +1,31 @@ + + + + + + INSERT INTO r_mp_v_thd ( measurement_point_id, data_type, data_date, v_thd) VALUES + ( #{item.lineId}, #{item.dataType}, #{item.dataDate}, #{item.vThd} ) + ON DUPLICATE KEY UPDATE v_thd = #{item.vThd} + + + + diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IRMpVThdService.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IRMpVThdService.java new file mode 100644 index 0000000..c05871e --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IRMpVThdService.java @@ -0,0 +1,18 @@ +package com.njcn.dataProcess.service; + +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.RMpVThd; + +import java.util.List; + +public interface IRMpVThdService { + + /** + * 计算谐波畸变率表 + * @param calculatedParam + */ + List queryHarmonicVThd(CalculatedParam calculatedParam); + + void batchInsertionThd(List result); +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationRMpVThdServiceImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationRMpVThdServiceImpl.java new file mode 100644 index 0000000..9f61431 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationRMpVThdServiceImpl.java @@ -0,0 +1,53 @@ +package com.njcn.dataProcess.service.impl.relation; + + +import cn.hutool.core.collection.CollectionUtil; + +import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.dataProcess.dao.relation.mapper.RMpVThdMapper; +import com.njcn.dataProcess.pojo.dto.RMpVThd; +import com.njcn.dataProcess.service.IRMpVThdService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.ListUtils; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/4/24 17:58 + */ + +@Service("RelationRMpVThdServiceImpl") +@Slf4j +public class RelationRMpVThdServiceImpl extends MppServiceImpl implements IRMpVThdService { + + @Override + @Transactional(rollbackFor = {Exception.class}) + public List queryHarmonicVThd(CalculatedParam calculatedParam) { + log.info(LocalDateTime.now()+"===>监测点谐波畸变率开始执行"); + List lineIds = calculatedParam.getIdList(); + List rMpVThdList = new ArrayList<>(); + //以尺寸100分片,查询数据 + List> pendingIds = ListUtils.partition(lineIds,5); + for (List pendingId : pendingIds) { + List result = this.baseMapper.getVThdData(calculatedParam.getDataDate(),pendingId); + if (CollectionUtil.isNotEmpty(result)){ + rMpVThdList.addAll(result); + } + } + return rMpVThdList; + } + + @Override + public void batchInsertionThd(List result) { + this.saveOrUpdateBatchByMultiId(result); + } +} diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java b/message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java index c30ba4a..2bae5e2 100644 --- a/message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java +++ b/message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java @@ -15,7 +15,7 @@ import java.time.LocalDateTime; * @version V1.0.0 */ @Data -public class DevComFlagDTO extends BaseMessage implements Serializable { +public class DevComFlagDTO extends BaseMessage implements Serializable { private String id; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java index 439a8a9..54194f0 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java @@ -129,7 +129,7 @@ public class TopicLogsConsumer extends EnhanceConsumerMessageHandler