diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/AlgorithmBootApplication.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/AlgorithmBootApplication.java index 82af0d6..ce60565 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/AlgorithmBootApplication.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/AlgorithmBootApplication.java @@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.DependsOn; +import org.springframework.scheduling.annotation.EnableAsync; /** @@ -16,6 +17,7 @@ import org.springframework.context.annotation.DependsOn; @SpringBootApplication(scanBasePackages = "com.njcn") @MapperScan("com.njcn.**.mapper") @DependsOn("proxyMapperRegister") +@EnableAsync public class AlgorithmBootApplication { public static void main(String[] args) { diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java index 639fd5f..6ca7a6d 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/ExecutionCenter.java @@ -12,7 +12,7 @@ import com.njcn.algorithm.pojo.bo.BaseParam; import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.pojo.bo.HourParam; import com.njcn.algorithm.pojo.enums.PrepareResponseEnum; -import com.njcn.algorithm.utils.MemorySizeUtil; +import com.njcn.algorithm.service.line.FlowService; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.exception.BusinessException; @@ -71,6 +71,8 @@ public class ExecutionCenter extends BaseController { private DeptLineFeignClient deptLineFeignClient; @Resource private CsLineFeignClient csLineFeignClient; + @Resource + private FlowService flowService; /*** * 1、校验非全链执行时,tagNames节点标签集合必须为非空,否则提示---无可执行节点 @@ -101,7 +103,7 @@ public class ExecutionCenter extends BaseController { * @date 2023/11/7 14:44 */ private void dealResponse(CalculatedParam calculatedParam, LiteflowResponse liteflowResponse, String methodDescribe) { - MemorySizeUtil.getNowMemory(); +// MemorySizeUtil.getNowMemory(); if (liteflowResponse.isSuccess()) { // 获取执行步骤列表 long allTime = 0; @@ -109,9 +111,9 @@ public class ExecutionCenter extends BaseController { for (String key : executeSteps.keySet()) { List cmpSteps = executeSteps.get(key); long timeSum = cmpSteps.stream().mapToLong(CmpStep::getTimeSpent).sum(); - allTime+=timeSum; + allTime += timeSum; } - logger.info("日期{},{}执行{}成功,执行总时长{}分钟", calculatedParam.getDataDate(), methodDescribe, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())),allTime/1000/60); + logger.info("日期{},{}执行{}成功,执行总时长{}分钟", calculatedParam.getDataDate(), methodDescribe, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())), allTime / 1000 / 60); } else { Map> executeSteps = liteflowResponse.getExecuteSteps(); CmpStep failStep = null; @@ -156,8 +158,8 @@ public class ExecutionCenter extends BaseController { startDate = DateUtil.offsetDay(startDate, 1); } calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN)); - liteflowResponse = flowExecutor.execute2Resp("measurement_point", calculatedParam); - dealResponse(calculatedParam, liteflowResponse, methodDescribe); + CalculatedParam repairParam = BeanUtil.copyProperties(calculatedParam, CalculatedParam.class); + flowService.execute2Resp(methodDescribe, repairParam); } } else { //非补招 @@ -233,7 +235,7 @@ public class ExecutionCenter extends BaseController { liteflowResponse = flowExecutor.execute2Resp("special_analysis", calculatedParam); dealResponse(calculatedParam, liteflowResponse, methodDescribe); } - + } @OperateInfo(info = LogEnum.BUSINESS_COMMON) @@ -277,7 +279,7 @@ public class ExecutionCenter extends BaseController { liteflowResponse = flowExecutor.execute2Resp("measurement_point_hour", calculatedParam); dealResponse(calculatedParam, liteflowResponse, methodDescribe); } - + } @OperateInfo(info = LogEnum.BUSINESS_COMMON) @@ -318,7 +320,7 @@ public class ExecutionCenter extends BaseController { liteflowResponse = flowExecutor.execute2Resp("device", calculatedParam); dealResponse(calculatedParam, liteflowResponse, methodDescribe); } - + } @OperateInfo(info = LogEnum.BUSINESS_COMMON) @@ -357,7 +359,7 @@ public class ExecutionCenter extends BaseController { liteflowResponse = flowExecutor.execute2Resp("org_point", calculatedParam); dealResponse(calculatedParam, liteflowResponse, methodDescribe); } - + } // @@ -445,7 +447,7 @@ public class ExecutionCenter extends BaseController { liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam); dealResponse(calculatedParam, liteflowResponse, methodDescribe); } - + } // // /** diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/FlowService.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/FlowService.java new file mode 100644 index 0000000..0626ed1 --- /dev/null +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/FlowService.java @@ -0,0 +1,12 @@ +package com.njcn.algorithm.service.line; + +import com.njcn.algorithm.pojo.bo.CalculatedParam; + +public interface FlowService { + + /** + * 多线程任务 + * @param calculatedParam + */ + void execute2Resp(String methodDescribe,CalculatedParam calculatedParam); +} diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/FlowServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/FlowServiceImpl.java new file mode 100644 index 0000000..ee8758f --- /dev/null +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/FlowServiceImpl.java @@ -0,0 +1,80 @@ +package com.njcn.algorithm.serviceimpl.line; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.text.StrPool; +import com.njcn.algorithm.ExecutionCenter; +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.algorithm.service.line.FlowService; +import com.njcn.algorithm.utils.MemorySizeUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.flow.entity.CmpStep; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * @author wr + * @description + * @date 2025/12/16 17:43 + */ +@Slf4j +@Service +public class FlowServiceImpl implements FlowService { + + private static final Logger logger = LoggerFactory.getLogger(FlowServiceImpl.class); + + @Resource + private FlowExecutor flowExecutor; + + @Override + @Async("asyncExecutor") + public void execute2Resp(String methodDescribe,CalculatedParam calculatedParam) { + LiteflowResponse liteflowResponse = flowExecutor.execute2Resp("measurement_point", calculatedParam); + dealResponse(calculatedParam, liteflowResponse, methodDescribe); + } + + /*** + * + * @author hongawen + * @date 2023/11/7 14:44 + */ + private void dealResponse(CalculatedParam calculatedParam, LiteflowResponse liteflowResponse, String methodDescribe) { + MemorySizeUtil.getNowMemory(); + if (liteflowResponse.isSuccess()) { + // 获取执行步骤列表 + long allTime = 0; + Map> executeSteps = liteflowResponse.getExecuteSteps(); + for (String key : executeSteps.keySet()) { + List cmpSteps = executeSteps.get(key); + long timeSum = cmpSteps.stream().mapToLong(CmpStep::getTimeSpent).sum(); + allTime+=timeSum; + } + logger.info("日期{},{}执行{}成功,执行总时长{}分钟", calculatedParam.getDataDate(), methodDescribe, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())),allTime/1000/60); + } else { + Map> executeSteps = liteflowResponse.getExecuteSteps(); + CmpStep failStep = null; + for (String key : executeSteps.keySet()) { + List cmpSteps = executeSteps.get(key); + cmpSteps = cmpSteps.stream().filter(cmpStep -> !cmpStep.isSuccess()).collect(Collectors.toList()); + if (CollectionUtil.isNotEmpty(cmpSteps)) { + failStep = cmpSteps.get(0); + } + } + logger.error("日期{},{}执行{}失败,在执行{}失败,失败原因:{}" + , calculatedParam.getDataDate() + , methodDescribe + , calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())) + , failStep.getNodeId().concat(Objects.isNull(failStep.getTag()) ? "" : StrPool.DASHED.concat(failStep.getTag())) + , Objects.isNull(failStep.getException()) ? null : failStep.getException().getMessage()); + } + } +} diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java index 5714f99..1c1a515 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java @@ -88,24 +88,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { @Override public void limitRateHandler(CalculatedParam calculatedParam) { MemorySizeUtil.getNowMemory(); - - Runtime runtime = Runtime.getRuntime(); - // 获取 JVM 最大可用内存(以字节为单位) - long maxMemory = runtime.maxMemory(); - // 获取 JVM 当前已分配的内存(以字节为单位) - long totalMemory = runtime.totalMemory(); - // 获取 JVM 当前空闲内存(以字节为单位) - long freeMemory = runtime.freeMemory(); - // 计算已使用的内存 - long usedMemory = totalMemory - freeMemory; - - System.out.println("最大可用内存: " + maxMemory / (1024 * 1024) + " MB"); - System.out.println("当前已分配的内存: " + totalMemory / (1024 * 1024) + " MB"); - System.out.println("当前空闲内存: " + freeMemory / (1024 * 1024) + " MB"); - System.out.println("已使用的内存: " + usedMemory / (1024 * 1024) + " MB"); - System.out.println("第一次分析结束-----------------------------------------"); - - logger.info("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{}=====》", calculatedParam.getDataDate()); + System.err.println("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{}=====》"+calculatedParam.getDataDate()); List result = new ArrayList<>(); //远程接口获取分钟数据 LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); @@ -116,13 +99,13 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { //获取所有监测点的限值 List overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData(); Map overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity())); - //添加异常数据时间点 - getAbnormalData(lineParam); //以100个监测点分片处理 List> pendingIds = ListUtils.partition(lineIds, NUM); ArrayList phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); MemorySizeUtil.getNowMemory(); - pendingIds.forEach(list -> { + for (int i = 0; i < pendingIds.size(); i++) { + logger.info("总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区"); + List list = pendingIds.get(i); lineParam.setLineId(list); //获取电压数据 List dataVAllTime = dataVFeignClient.getRawData(lineParam).getData(); @@ -245,8 +228,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { dataPlt.get(item))); } } - }); - MemorySizeUtil.getNowMemory(); + } if (CollUtil.isNotEmpty(result)) { //存储数据 List dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList());