diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java index 3614677..ddcffd3 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java @@ -4,26 +4,29 @@ import com.njcn.device.biz.pojo.po.Overlimit; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; public interface IDataLimitRateAsync { /** * limit_rate多线程算法 + * * @param dataDate * @param list * @param phase * @param overLimitMap * @param size * @param i - * @param type 系统类型 + * @param type 系统类型 + * @return * @Author: wr * @Date: 2025/12/17 12:16 */ - void lineDataRate(String dataDate, - List list, - List phase, - Map overLimitMap, - int size, - int i, - int type); + CompletableFuture lineDataRate(String dataDate, + List list, + List phase, + Map overLimitMap, + int size, + int i, + int type); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java index 1059c2b..1ca4af5 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -108,11 +109,26 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { List> pendingIds = ListUtils.partition(lineIds, 1); ArrayList phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); MemorySizeUtil.getNowMemory(); + + List> futures = new ArrayList<>(); for (int i = 0; i < pendingIds.size(); i++) { logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区"); List list = pendingIds.get(i); - dataLimitRateAsync.lineDataRate(calculatedParam.getDataDate(), list, phase, overLimitMap, pendingIds.size(), (i + 1), lineParam.getType()); + // 获取Future + CompletableFuture future = dataLimitRateAsync.lineDataRate( + calculatedParam.getDataDate(), + list, + phase, + overLimitMap, + pendingIds.size(), + (i + 1), + lineParam.getType() + ); + + futures.add(future); } + // 等待所有任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); System.gc(); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java index 18be446..437c822 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java @@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.lang.reflect.Method; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** @@ -56,13 +57,13 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync { @Override @Async("asyncExecutor") - public void lineDataRate(String dataDate, - List list, - List phase, - Map overLimitMap, - int size, - int i, - int type) { + public CompletableFuture lineDataRate(String dataDate, + List list, + List phase, + Map overLimitMap, + int size, + int i, + int type) { List result = new ArrayList<>(); LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(dataDate)); @@ -237,6 +238,8 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync { MemorySizeUtil.getNowMemory(); } System.gc(); + + return CompletableFuture.completedFuture(null); } /**