优化处理,避免任务重叠问题

异常处理
This commit is contained in:
2025-03-12 16:02:57 +08:00
parent 6d3a1dd735
commit 11713e9b68

View File

@@ -44,50 +44,53 @@ public class AutoAccessTimer implements ApplicationRunner {
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
ExecutorService executor = Executors.newFixedThreadPool(10); ExecutorService executor = Executors.newFixedThreadPool(10);
// 将任务平均分配给10个子列表 // 将任务平均分配给10个子列表
List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>(); List<List<CsEquipmentDeliveryPO>> subLists = CollUtil.split(list, 10);
int partitionSize = list.size() / 10; // List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>();
for (int i = 0; i < 10; i++) { // int partitionSize = list.size() / 10;
int start = i * partitionSize; // for (int i = 0; i < 10; i++) {
int end = (i == 9) ? list.size() : start + partitionSize; // int start = i * partitionSize;
subLists.add(list.subList(start, end)); // int end = (i == 9) ? list.size() : start + partitionSize;
} // subLists.add(list.subList(start, end));
// }
// 创建一个ExecutorService来处理这些任务 // 创建一个ExecutorService来处理这些任务
List<Future<Void>> futures = new ArrayList<>(); List<Future<Void>> futures = new ArrayList<>();
// 提交任务给线程池执行 // 提交任务给线程池执行
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
int index = i; int index = i;
futures.add(executor.submit(new Callable<Void>() { futures.add(executor.submit(() -> {
@Override
public Void call() {
accessDev(subLists.get(index)); accessDev(subLists.get(index));
return null; return null;
}
})); }));
} }
// 等待所有任务完成 // 等待所有任务完成
for (Future<Void> future : futures) { for (Future<Void> future : futures) {
try { try {
future.get(); future.get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); Thread.currentThread().interrupt();
log.error("任务被中断", e);
} catch (ExecutionException e) {
log.error("任务执行异常", e.getCause());
} }
} }
// 关闭ExecutorService // 关闭ExecutorService
executor.shutdown(); executor.shutdown();
} }
}; };
//第一次执行的时间为120s然后每隔120s执行一次 //第一次执行的时间为120s然后在前一个任务执行完毕后,等待120s执行下一个任务
scheduler.scheduleAtFixedRate(task,AUTO_TIME,AUTO_TIME,TimeUnit.SECONDS); scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS);
} }
public void accessDev(List<CsEquipmentDeliveryPO> list) { public void accessDev(List<CsEquipmentDeliveryPO> list) {
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
try { try {
list.forEach(item->{ list.forEach(item -> {
System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid());
//判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) { if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) {
//csDeviceService.wlDevRegister(item.getNdid()); //csDeviceService.wlDevRegister(item.getNdid());
log.info("请先手动注册、接入"); log.info("请先手动注册、接入");
} else { } else {
@@ -95,9 +98,9 @@ public class AutoAccessTimer implements ApplicationRunner {
if (Objects.isNull(version)) { if (Objects.isNull(version)) {
version = "V1"; version = "V1";
} }
csDeviceService.autoAccess(item.getNdid(),version,1); csDeviceService.autoAccess(item.getNdid(), version, 1);
} }
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
}); });
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage()); log.error(e.getMessage());