From d3e46f5ba4928fd87b08771d719dd801147a1f0f Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Sat, 28 Feb 2026 16:07:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dbug=E3=80=82=E7=AE=97?= =?UTF-8?q?=E6=B3=95=E5=86=85=E9=83=A8=E4=BD=BF=E7=94=A8=E6=96=B0=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=AF=BC=E8=87=B4LiteFlow=E5=B1=82=E9=9D=A2=E7=9A=84p?= =?UTF-8?q?rocess()=E6=96=B9=E6=B3=95=E7=9E=AC=E9=97=B4=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=EF=BC=8C=E4=BD=86=E5=AE=9E=E9=99=85=E8=AE=A1=E7=AE=97=E8=BF=98?= =?UTF-8?q?=E5=9C=A8=E8=BF=9B=E8=A1=8C=EF=BC=8C=E4=BC=9A=E5=BD=B1=E5=93=8D?= =?UTF-8?q?=E6=9C=89=E4=B8=8A=E4=B8=8B=E5=B1=82=E5=85=B3=E7=B3=BB=E7=9A=84?= =?UTF-8?q?=E7=AE=97=E6=B3=95=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/line/IDataLimitRateAsync.java | 19 +++++++++++-------- .../line/IDataCrossingServiceImpl.java | 18 +++++++++++++++++- .../line/IDataLimitRateAsyncImpl.java | 17 ++++++++++------- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java index 3614677..ddcffd3 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataLimitRateAsync.java @@ -4,26 +4,29 @@ import com.njcn.device.biz.pojo.po.Overlimit; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; public interface IDataLimitRateAsync { /** * limit_rate多线程算法 + * * @param dataDate * @param list * @param phase * @param overLimitMap * @param size * @param i - * @param type 系统类型 + * @param type 系统类型 + * @return * @Author: wr * @Date: 2025/12/17 12:16 */ - void lineDataRate(String dataDate, - List list, - List phase, - Map overLimitMap, - int size, - int i, - int type); + CompletableFuture lineDataRate(String dataDate, + List list, + List phase, + Map overLimitMap, + int size, + int i, + int type); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java index 1059c2b..1ca4af5 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataCrossingServiceImpl.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -108,11 +109,26 @@ public class IDataCrossingServiceImpl implements IDataCrossingService { List> pendingIds = ListUtils.partition(lineIds, 1); ArrayList phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); MemorySizeUtil.getNowMemory(); + + List> futures = new ArrayList<>(); for (int i = 0; i < pendingIds.size(); i++) { logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区"); List list = pendingIds.get(i); - dataLimitRateAsync.lineDataRate(calculatedParam.getDataDate(), list, phase, overLimitMap, pendingIds.size(), (i + 1), lineParam.getType()); + // 获取Future + CompletableFuture future = dataLimitRateAsync.lineDataRate( + calculatedParam.getDataDate(), + list, + phase, + overLimitMap, + pendingIds.size(), + (i + 1), + lineParam.getType() + ); + + futures.add(future); } + // 等待所有任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); System.gc(); } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java index 18be446..437c822 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataLimitRateAsyncImpl.java @@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.lang.reflect.Method; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** @@ -56,13 +57,13 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync { @Override @Async("asyncExecutor") - public void lineDataRate(String dataDate, - List list, - List phase, - Map overLimitMap, - int size, - int i, - int type) { + public CompletableFuture lineDataRate(String dataDate, + List list, + List phase, + Map overLimitMap, + int size, + int i, + int type) { List result = new ArrayList<>(); LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); lineParam.setStartTime(TimeUtils.getBeginOfDay(dataDate)); @@ -237,6 +238,8 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync { MemorySizeUtil.getNowMemory(); } System.gc(); + + return CompletableFuture.completedFuture(null); } /**