增加任务多线程
This commit is contained in:
@@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication;
|
|||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||||
import org.springframework.context.annotation.DependsOn;
|
import org.springframework.context.annotation.DependsOn;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -16,6 +17,7 @@ import org.springframework.context.annotation.DependsOn;
|
|||||||
@SpringBootApplication(scanBasePackages = "com.njcn")
|
@SpringBootApplication(scanBasePackages = "com.njcn")
|
||||||
@MapperScan("com.njcn.**.mapper")
|
@MapperScan("com.njcn.**.mapper")
|
||||||
@DependsOn("proxyMapperRegister")
|
@DependsOn("proxyMapperRegister")
|
||||||
|
@EnableAsync
|
||||||
public class AlgorithmBootApplication {
|
public class AlgorithmBootApplication {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ import com.njcn.algorithm.pojo.bo.BaseParam;
|
|||||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
import com.njcn.algorithm.pojo.bo.HourParam;
|
import com.njcn.algorithm.pojo.bo.HourParam;
|
||||||
import com.njcn.algorithm.pojo.enums.PrepareResponseEnum;
|
import com.njcn.algorithm.pojo.enums.PrepareResponseEnum;
|
||||||
import com.njcn.algorithm.utils.MemorySizeUtil;
|
import com.njcn.algorithm.service.line.FlowService;
|
||||||
import com.njcn.common.pojo.annotation.OperateInfo;
|
import com.njcn.common.pojo.annotation.OperateInfo;
|
||||||
import com.njcn.common.pojo.enums.common.LogEnum;
|
import com.njcn.common.pojo.enums.common.LogEnum;
|
||||||
import com.njcn.common.pojo.exception.BusinessException;
|
import com.njcn.common.pojo.exception.BusinessException;
|
||||||
@@ -71,6 +71,8 @@ public class ExecutionCenter extends BaseController {
|
|||||||
private DeptLineFeignClient deptLineFeignClient;
|
private DeptLineFeignClient deptLineFeignClient;
|
||||||
@Resource
|
@Resource
|
||||||
private CsLineFeignClient csLineFeignClient;
|
private CsLineFeignClient csLineFeignClient;
|
||||||
|
@Resource
|
||||||
|
private FlowService flowService;
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* 1、校验非全链执行时,tagNames节点标签集合必须为非空,否则提示---无可执行节点
|
* 1、校验非全链执行时,tagNames节点标签集合必须为非空,否则提示---无可执行节点
|
||||||
@@ -101,7 +103,7 @@ public class ExecutionCenter extends BaseController {
|
|||||||
* @date 2023/11/7 14:44
|
* @date 2023/11/7 14:44
|
||||||
*/
|
*/
|
||||||
private void dealResponse(CalculatedParam calculatedParam, LiteflowResponse liteflowResponse, String methodDescribe) {
|
private void dealResponse(CalculatedParam calculatedParam, LiteflowResponse liteflowResponse, String methodDescribe) {
|
||||||
MemorySizeUtil.getNowMemory();
|
// MemorySizeUtil.getNowMemory();
|
||||||
if (liteflowResponse.isSuccess()) {
|
if (liteflowResponse.isSuccess()) {
|
||||||
// 获取执行步骤列表
|
// 获取执行步骤列表
|
||||||
long allTime = 0;
|
long allTime = 0;
|
||||||
@@ -109,9 +111,9 @@ public class ExecutionCenter extends BaseController {
|
|||||||
for (String key : executeSteps.keySet()) {
|
for (String key : executeSteps.keySet()) {
|
||||||
List<CmpStep> cmpSteps = executeSteps.get(key);
|
List<CmpStep> cmpSteps = executeSteps.get(key);
|
||||||
long timeSum = cmpSteps.stream().mapToLong(CmpStep::getTimeSpent).sum();
|
long timeSum = cmpSteps.stream().mapToLong(CmpStep::getTimeSpent).sum();
|
||||||
allTime+=timeSum;
|
allTime += timeSum;
|
||||||
}
|
}
|
||||||
logger.info("日期{},{}执行{}成功,执行总时长{}分钟", calculatedParam.getDataDate(), methodDescribe, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())),allTime/1000/60);
|
logger.info("日期{},{}执行{}成功,执行总时长{}分钟", calculatedParam.getDataDate(), methodDescribe, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())), allTime / 1000 / 60);
|
||||||
} else {
|
} else {
|
||||||
Map<String, List<CmpStep>> executeSteps = liteflowResponse.getExecuteSteps();
|
Map<String, List<CmpStep>> executeSteps = liteflowResponse.getExecuteSteps();
|
||||||
CmpStep failStep = null;
|
CmpStep failStep = null;
|
||||||
@@ -156,8 +158,8 @@ public class ExecutionCenter extends BaseController {
|
|||||||
startDate = DateUtil.offsetDay(startDate, 1);
|
startDate = DateUtil.offsetDay(startDate, 1);
|
||||||
}
|
}
|
||||||
calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
|
calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
|
||||||
liteflowResponse = flowExecutor.execute2Resp("measurement_point", calculatedParam);
|
CalculatedParam repairParam = BeanUtil.copyProperties(calculatedParam, CalculatedParam.class);
|
||||||
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
flowService.execute2Resp(methodDescribe, repairParam);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
//非补招
|
//非补招
|
||||||
@@ -233,7 +235,7 @@ public class ExecutionCenter extends BaseController {
|
|||||||
liteflowResponse = flowExecutor.execute2Resp("special_analysis", calculatedParam);
|
liteflowResponse = flowExecutor.execute2Resp("special_analysis", calculatedParam);
|
||||||
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
@@ -277,7 +279,7 @@ public class ExecutionCenter extends BaseController {
|
|||||||
liteflowResponse = flowExecutor.execute2Resp("measurement_point_hour", calculatedParam);
|
liteflowResponse = flowExecutor.execute2Resp("measurement_point_hour", calculatedParam);
|
||||||
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
@@ -318,7 +320,7 @@ public class ExecutionCenter extends BaseController {
|
|||||||
liteflowResponse = flowExecutor.execute2Resp("device", calculatedParam);
|
liteflowResponse = flowExecutor.execute2Resp("device", calculatedParam);
|
||||||
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
@@ -357,7 +359,7 @@ public class ExecutionCenter extends BaseController {
|
|||||||
liteflowResponse = flowExecutor.execute2Resp("org_point", calculatedParam);
|
liteflowResponse = flowExecutor.execute2Resp("org_point", calculatedParam);
|
||||||
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
@@ -445,7 +447,7 @@ public class ExecutionCenter extends BaseController {
|
|||||||
liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
|
liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
|
||||||
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
// /**
|
// /**
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
package com.njcn.algorithm.service.line;
|
||||||
|
|
||||||
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
|
|
||||||
|
public interface FlowService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 多线程任务
|
||||||
|
* @param calculatedParam
|
||||||
|
*/
|
||||||
|
void execute2Resp(String methodDescribe,CalculatedParam calculatedParam);
|
||||||
|
}
|
||||||
@@ -0,0 +1,80 @@
|
|||||||
|
package com.njcn.algorithm.serviceimpl.line;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import cn.hutool.core.text.StrPool;
|
||||||
|
import com.njcn.algorithm.ExecutionCenter;
|
||||||
|
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||||
|
import com.njcn.algorithm.service.line.FlowService;
|
||||||
|
import com.njcn.algorithm.utils.MemorySizeUtil;
|
||||||
|
import com.yomahub.liteflow.core.FlowExecutor;
|
||||||
|
import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||||
|
import com.yomahub.liteflow.flow.entity.CmpStep;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author wr
|
||||||
|
* @description
|
||||||
|
* @date 2025/12/16 17:43
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class FlowServiceImpl implements FlowService {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(FlowServiceImpl.class);
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private FlowExecutor flowExecutor;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Async("asyncExecutor")
|
||||||
|
public void execute2Resp(String methodDescribe,CalculatedParam calculatedParam) {
|
||||||
|
LiteflowResponse liteflowResponse = flowExecutor.execute2Resp("measurement_point", calculatedParam);
|
||||||
|
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
*
|
||||||
|
* @author hongawen
|
||||||
|
* @date 2023/11/7 14:44
|
||||||
|
*/
|
||||||
|
private void dealResponse(CalculatedParam calculatedParam, LiteflowResponse liteflowResponse, String methodDescribe) {
|
||||||
|
MemorySizeUtil.getNowMemory();
|
||||||
|
if (liteflowResponse.isSuccess()) {
|
||||||
|
// 获取执行步骤列表
|
||||||
|
long allTime = 0;
|
||||||
|
Map<String, List<CmpStep>> executeSteps = liteflowResponse.getExecuteSteps();
|
||||||
|
for (String key : executeSteps.keySet()) {
|
||||||
|
List<CmpStep> cmpSteps = executeSteps.get(key);
|
||||||
|
long timeSum = cmpSteps.stream().mapToLong(CmpStep::getTimeSpent).sum();
|
||||||
|
allTime+=timeSum;
|
||||||
|
}
|
||||||
|
logger.info("日期{},{}执行{}成功,执行总时长{}分钟", calculatedParam.getDataDate(), methodDescribe, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())),allTime/1000/60);
|
||||||
|
} else {
|
||||||
|
Map<String, List<CmpStep>> executeSteps = liteflowResponse.getExecuteSteps();
|
||||||
|
CmpStep failStep = null;
|
||||||
|
for (String key : executeSteps.keySet()) {
|
||||||
|
List<CmpStep> cmpSteps = executeSteps.get(key);
|
||||||
|
cmpSteps = cmpSteps.stream().filter(cmpStep -> !cmpStep.isSuccess()).collect(Collectors.toList());
|
||||||
|
if (CollectionUtil.isNotEmpty(cmpSteps)) {
|
||||||
|
failStep = cmpSteps.get(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.error("日期{},{}执行{}失败,在执行{}失败,失败原因:{}"
|
||||||
|
, calculatedParam.getDataDate()
|
||||||
|
, methodDescribe
|
||||||
|
, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames()))
|
||||||
|
, failStep.getNodeId().concat(Objects.isNull(failStep.getTag()) ? "" : StrPool.DASHED.concat(failStep.getTag()))
|
||||||
|
, Objects.isNull(failStep.getException()) ? null : failStep.getException().getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -88,24 +88,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
@Override
|
@Override
|
||||||
public void limitRateHandler(CalculatedParam calculatedParam) {
|
public void limitRateHandler(CalculatedParam calculatedParam) {
|
||||||
MemorySizeUtil.getNowMemory();
|
MemorySizeUtil.getNowMemory();
|
||||||
|
System.err.println("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{}=====》"+calculatedParam.getDataDate());
|
||||||
Runtime runtime = Runtime.getRuntime();
|
|
||||||
// 获取 JVM 最大可用内存(以字节为单位)
|
|
||||||
long maxMemory = runtime.maxMemory();
|
|
||||||
// 获取 JVM 当前已分配的内存(以字节为单位)
|
|
||||||
long totalMemory = runtime.totalMemory();
|
|
||||||
// 获取 JVM 当前空闲内存(以字节为单位)
|
|
||||||
long freeMemory = runtime.freeMemory();
|
|
||||||
// 计算已使用的内存
|
|
||||||
long usedMemory = totalMemory - freeMemory;
|
|
||||||
|
|
||||||
System.out.println("最大可用内存: " + maxMemory / (1024 * 1024) + " MB");
|
|
||||||
System.out.println("当前已分配的内存: " + totalMemory / (1024 * 1024) + " MB");
|
|
||||||
System.out.println("当前空闲内存: " + freeMemory / (1024 * 1024) + " MB");
|
|
||||||
System.out.println("已使用的内存: " + usedMemory / (1024 * 1024) + " MB");
|
|
||||||
System.out.println("第一次分析结束-----------------------------------------");
|
|
||||||
|
|
||||||
logger.info("limitRate表转r_stat_limit_rate_d算法开始,执行日期为{}=====》", calculatedParam.getDataDate());
|
|
||||||
List<DataLimitDetailDto> result = new ArrayList<>();
|
List<DataLimitDetailDto> result = new ArrayList<>();
|
||||||
//远程接口获取分钟数据
|
//远程接口获取分钟数据
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
@@ -116,13 +99,13 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//获取所有监测点的限值
|
//获取所有监测点的限值
|
||||||
List<Overlimit> overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData();
|
List<Overlimit> overLimitList = commTerminalGeneralClient.getOverLimitDataByIds(lineIds).getData();
|
||||||
Map<String, Overlimit> overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity()));
|
Map<String, Overlimit> overLimitMap = overLimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity()));
|
||||||
//添加异常数据时间点
|
|
||||||
getAbnormalData(lineParam);
|
|
||||||
//以100个监测点分片处理
|
//以100个监测点分片处理
|
||||||
List<List<String>> pendingIds = ListUtils.partition(lineIds, NUM);
|
List<List<String>> pendingIds = ListUtils.partition(lineIds, NUM);
|
||||||
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
|
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
|
||||||
MemorySizeUtil.getNowMemory();
|
MemorySizeUtil.getNowMemory();
|
||||||
pendingIds.forEach(list -> {
|
for (int i = 0; i < pendingIds.size(); i++) {
|
||||||
|
logger.info("总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区");
|
||||||
|
List<String> list = pendingIds.get(i);
|
||||||
lineParam.setLineId(list);
|
lineParam.setLineId(list);
|
||||||
//获取电压数据
|
//获取电压数据
|
||||||
List<DataVDto> dataVAllTime = dataVFeignClient.getRawData(lineParam).getData();
|
List<DataVDto> dataVAllTime = dataVFeignClient.getRawData(lineParam).getData();
|
||||||
@@ -245,8 +228,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
dataPlt.get(item)));
|
dataPlt.get(item)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
MemorySizeUtil.getNowMemory();
|
|
||||||
if (CollUtil.isNotEmpty(result)) {
|
if (CollUtil.isNotEmpty(result)) {
|
||||||
//存储数据
|
//存储数据
|
||||||
List<DataLimitRateDto> dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList());
|
List<DataLimitRateDto> dataLimitRate = result.stream().map(DataLimitDetailDto::getDataLimitRate).filter(ObjectUtil::isNotNull).collect(Collectors.toList());
|
||||||
|
|||||||
Reference in New Issue
Block a user