1 Commits

3 changed files with 38 additions and 16 deletions

View File

@@ -4,11 +4,13 @@ import com.njcn.device.biz.pojo.po.Overlimit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public interface IDataLimitRateAsync {
/**
* limit_rate多线程算法
*
* @param dataDate
* @param list
* @param phase
@@ -16,10 +18,11 @@ public interface IDataLimitRateAsync {
* @param size
* @param i
* @param type 系统类型
* @return
* @Author: wr
* @Date: 2025/12/17 12:16
*/
void lineDataRate(String dataDate,
CompletableFuture<Void> lineDataRate(String dataDate,
List<String> list,
List<String> phase,
Map<String, Overlimit> overLimitMap,

View File

@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -108,11 +109,26 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
List<List<String>> pendingIds = ListUtils.partition(lineIds, 1);
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
MemorySizeUtil.getNowMemory();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < pendingIds.size(); i++) {
logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区");
List<String> list = pendingIds.get(i);
dataLimitRateAsync.lineDataRate(calculatedParam.getDataDate(), list, phase, overLimitMap, pendingIds.size(), (i + 1), lineParam.getType());
// 获取Future
CompletableFuture<Void> future = dataLimitRateAsync.lineDataRate(
calculatedParam.getDataDate(),
list,
phase,
overLimitMap,
pendingIds.size(),
(i + 1),
lineParam.getType()
);
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
System.gc();
}

View File

@@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
@@ -56,7 +57,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
@Override
@Async("asyncExecutor")
public void lineDataRate(String dataDate,
public CompletableFuture<Void> lineDataRate(String dataDate,
List<String> list,
List<String> phase,
Map<String, Overlimit> overLimitMap,
@@ -237,6 +238,8 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
MemorySizeUtil.getNowMemory();
}
System.gc();
return CompletableFuture.completedFuture(null);
}
/**