定时任务优化

This commit is contained in:
xy
2025-08-20 17:54:25 +08:00
parent 1bb9ad1cf7
commit 72351c612b
4 changed files with 278 additions and 74 deletions

View File

@@ -73,6 +73,8 @@ public enum AccessResponseEnum {
PROCESS_ERROR("A0311","调试流程异常,请先进行功能调试、出厂调试!"), PROCESS_ERROR("A0311","调试流程异常,请先进行功能调试、出厂调试!"),
FILE_CHECK_ERROR("A0312","文件校验码不一致!"), FILE_CHECK_ERROR("A0312","文件校验码不一致!"),
CLD_MODEL_EXIST("A0313","云前置模板已存在,请先删除再录入!"),
; ;
private final String code; private final String code;

View File

@@ -38,77 +38,198 @@ public class AutoAccessTimer implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
if (scheduler.isShutdown() || scheduler.isTerminated()) {
scheduler = Executors.newScheduledThreadPool(1);
}
Runnable task = () -> { Runnable task = () -> {
try {
executeScheduledTask();
}
// 捕获所有Throwable包括Error
catch (Throwable t) {
log.error("定时任务发生严重异常,尝试恢复", t);
// 可以添加重启逻辑或告警
}
};
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS);
// 添加监控,如果任务被取消则重新调度
monitorScheduledTask(future);
}
//10分钟检查一下调度任务
private void monitorScheduledTask(ScheduledFuture<?> future) {
Thread monitorThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
//每10分钟检查一次
Thread.sleep(600000);
if (future.isCancelled() || future.isDone()) {
log.warn("定时任务被取消或完成,重新调度...");
// 重新启动任务
run(null);
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("监控线程被中断");
break;
} catch (Exception e) {
log.error("监控任务异常", e);
}
}
}, "Schedule-Monitor-Thread");
monitorThread.setDaemon(true);
monitorThread.start();
}
private void executeScheduledTask() {
log.info("轮询定时任务执行中!"); log.info("轮询定时任务执行中!");
List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getOfflineDev(); List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getOfflineDev();
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
ExecutorService executor = Executors.newFixedThreadPool(10); ExecutorService executor = Executors.newFixedThreadPool(10);
// 将任务平均分配给10个子列表 try {
List<List<CsEquipmentDeliveryPO>> subLists = CollUtil.split(list, 10); List<List<CsEquipmentDeliveryPO>> subLists = CollUtil.split(list, 10);
// List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>();
// int partitionSize = list.size() / 10;
// for (int i = 0; i < 10; i++) {
// int start = i * partitionSize;
// int end = (i == 9) ? list.size() : start + partitionSize;
// subLists.add(list.subList(start, end));
// }
// 创建一个ExecutorService来处理这些任务
List<Future<Void>> futures = new ArrayList<>(); List<Future<Void>> futures = new ArrayList<>();
// 提交任务给线程池执行
// for (int i = 0; i < 10; i++) {
// int index = i;
// futures.add(executor.submit(() -> {
// accessDev(subLists.get(index));
// return null;
// }));
// }
for (List<CsEquipmentDeliveryPO> subList : subLists) { for (List<CsEquipmentDeliveryPO> subList : subLists) {
futures.add(executor.submit(() -> { futures.add(executor.submit(() -> {
accessDev(subList); try {
accessDevSafely(subList); // 使用安全版本
} catch (Exception e) {
log.error("处理设备子列表异常", e);
}
return null; return null;
})); }));
} }
// 等待所有任务完成
for (Future<Void> future : futures) { for (Future<Void> future : futures) {
try { try {
future.get(); future.get(5, TimeUnit.MINUTES);
} catch (InterruptedException e) { } catch (TimeoutException e) {
Thread.currentThread().interrupt(); log.error("任务执行超时", e);
log.error("任务被中断", e); } catch (Exception e) {
} catch (ExecutionException e) { log.error("任务执行异常", e);
log.error("任务执行异常", e.getCause());
} }
} }
// 关闭ExecutorService } finally {
executor.shutdown(); executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
} }
};
//第一次执行的时间为120s然后在前一个任务执行完毕后等待120s再执行下一个任务
scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS);
} }
public void accessDev(List<CsEquipmentDeliveryPO> list) { //安全的accessDev版本
private void accessDevSafely(List<CsEquipmentDeliveryPO> list) {
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
for (CsEquipmentDeliveryPO item : list) {
try { try {
list.forEach(item -> { processSingleDevice(item);
} catch (Exception e) {
log.error("处理设备 {} 失败: {}", item.getNdid(), e.getMessage());
}
}
}
}
private void processSingleDevice(CsEquipmentDeliveryPO item) {
System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid());
//判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) { if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) {
//csDeviceService.wlDevRegister(item.getNdid()); log.info("设备 {} 需要手动注册、接入", item.getNdid());
log.info("请先手动注册、接入");
} else { } else {
String version = csTopicService.getVersion(item.getNdid()); String version = csTopicService.getVersion(item.getNdid());
if (Objects.isNull(version)) { if (Objects.isNull(version)) {
version = "V1"; version = "V1";
} }
csDeviceService.autoAccess(item.getNdid(), version, 1); // 使用try-catch确保单个设备失败不影响其他设备
} try {
boolean success = csDeviceService.autoAccess2(item.getNdid(), version, 1);
if (success) {
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
}); } else {
log.warn("设备 {} 接入失败", item.getNdid());
}
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage()); log.error("设备 {} 接入异常: {}", item.getNdid(), e.getMessage());
} }
} }
} }
// @Override
// public void run(ApplicationArguments args) {
// if (scheduler.isShutdown() || scheduler.isTerminated()) {
// scheduler = Executors.newScheduledThreadPool(1);
// }
// Runnable task = () -> {
// log.info("轮询定时任务执行中!");
// List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getOfflineDev();
// if (CollUtil.isNotEmpty(list)) {
// ExecutorService executor = Executors.newFixedThreadPool(10);
// // 将任务平均分配给10个子列表
// List<List<CsEquipmentDeliveryPO>> subLists = CollUtil.split(list, 10);
// // 创建一个ExecutorService来处理这些任务
// List<Future<Void>> futures = new ArrayList<>();
// for (List<CsEquipmentDeliveryPO> subList : subLists) {
// futures.add(executor.submit(() -> {
// try {
// accessDev(subList);
// } catch (Exception e) {
// log.error("处理设备子列表异常,但继续处理其他任务", e);
// }
// return null;
// }));
// }
// // 等待所有任务完成
// for (Future<Void> future : futures) {
// try {
// future.get();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// log.error("任务被中断", e);
// } catch (ExecutionException e) {
// log.error("任务执行异常", e.getCause());
// } catch (Exception e) {
// log.error("系统异常", e.getCause());
// }
// }
// // 关闭ExecutorService
// executor.shutdown();
// }
// };
// //第一次执行的时间为120s然后在前一个任务执行完毕后等待120s再执行下一个任务
//// scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS);
// scheduler.scheduleWithFixedDelay(task, 10, 10, TimeUnit.SECONDS);
// }
//
// public void accessDev(List<CsEquipmentDeliveryPO> list) {
// if (CollUtil.isNotEmpty(list)) {
// try {
// list.forEach(item -> {
// System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid());
// //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
// String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
// if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) {
// //csDeviceService.wlDevRegister(item.getNdid());
// log.info("请先手动注册、接入");
// } else {
// String version = csTopicService.getVersion(item.getNdid());
// if (Objects.isNull(version)) {
// version = "V1";
// }
// csDeviceService.autoAccess(item.getNdid(), version, 1);
// }
// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
// });
// } catch (Exception e) {
// log.error(e.getMessage());
// }
// }
// }
} }

View File

@@ -3,6 +3,7 @@ package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.AccessResponseEnum;
@@ -96,6 +97,14 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData(); DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData();
if (Objects.isNull(dictTreeVO)){ if (Objects.isNull(dictTreeVO)){
throw new BusinessException(AccessResponseEnum.DEV_TYPE_NOT_FIND); throw new BusinessException(AccessResponseEnum.DEV_TYPE_NOT_FIND);
} else if (Objects.equals(devType,"CLD")) {
//查询是否已存在云前置模板
LambdaQueryWrapper<CsDevModelPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(CsDevModelPO::getName,"CLD").eq(CsDevModelPO::getStatus,1);
List<CsDevModelPO> list = csDevModelMapper.selectList(lambdaQueryWrapper);
if (CollectionUtil.isNotEmpty(list)) {
throw new BusinessException(AccessResponseEnum.CLD_MODEL_EXIST);
}
} }
logDto.setOperate("新增设备模板,模板名称:" + templateDto.getDevType()); logDto.setOperate("新增设备模板,模板名称:" + templateDto.getDevType());
//模板文件存入文件服务器 //模板文件存入文件服务器
@@ -373,9 +382,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
} }
eleEpdPqdParam.setClassId(classId); eleEpdPqdParam.setClassId(classId);
if (!Objects.isNull(apf.getHarmStart())){ if (!Objects.isNull(apf.getHarmStart())){
if (Objects.equals(apf.getHarmStart(),0.5) && Objects.equals(apf.getHarmEnd(),49.5)){ if (Objects.equals(apf.getHarmStart(),0.5)){
eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()+0.5)); eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()+49.5)); eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()+0.5));
} else { } else {
eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()*1.0)); eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()*1.0)); eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()*1.0));
@@ -612,9 +621,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
} }
eleEpdPqdParam.setUnit(epd.getUnit()); eleEpdPqdParam.setUnit(epd.getUnit());
if (!Objects.isNull(epd.getHarmStart())){ if (!Objects.isNull(epd.getHarmStart())){
if (Objects.equals(epd.getHarmStart(),0.5) && Objects.equals(epd.getHarmEnd(),49.5)){ if (Objects.equals(epd.getHarmStart(),0.5)){
eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()+0.5)); eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()+49.5)); eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()+0.5));
} else { } else {
eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()*1.0)); eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()*1.0)); eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()*1.0));
@@ -648,14 +657,15 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
} }
eleEpdPqdParam.setUnit(pqd.getUnit()); eleEpdPqdParam.setUnit(pqd.getUnit());
if (!Objects.isNull(pqd.getHarmStart())){ if (!Objects.isNull(pqd.getHarmStart())){
if (Objects.equals(pqd.getHarmStart(),0.5) && Objects.equals(pqd.getHarmEnd(),49.5)){ if (Objects.equals(pqd.getHarmStart(),0.5)){
eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()+0.5)); eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()+49.5)); eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()+0.5));
} else { } else {
eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()*1.0)); eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()*1.0)); eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()*1.0));
} }
} }
eleEpdPqdParam.setStatMethod(pqd.getStatMethod());
eleEpdPqdParam.setDataType(id); eleEpdPqdParam.setDataType(id);
eleEpdPqdParam.setClassId(classId); eleEpdPqdParam.setClassId(classId);
result.add(eleEpdPqdParam); result.add(eleEpdPqdParam);

View File

@@ -604,6 +604,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public boolean autoAccess(String nDid,String version,Integer mid) { public boolean autoAccess(String nDid,String version,Integer mid) {
boolean result = false;
try {
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName); boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (!mqttClient) { if (!mqttClient) {
@@ -615,9 +617,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csLogsFeignClient.addUserLog(logDto); csLogsFeignClient.addUserLog(logDto);
throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE);
} }
boolean result = false;
Map<Integer,String> modelMap = new HashMap<>(); Map<Integer,String> modelMap = new HashMap<>();
try {
//删除缓存数据 //删除缓存数据
redisUtil.delete(AppRedisKey.MODEL + nDid); redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
@@ -669,6 +669,77 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
return result; return result;
} }
@Transactional(rollbackFor = Exception.class)
public boolean autoAccess2(String nDid, String version, Integer mid) {
boolean result = false;
try {
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (!mqttClient) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "接入失败,装置客户端不在线");
csLogsFeignClient.addUserLog(logDto);
// 改为返回false而不是抛出异常
log.warn("设备 {} 客户端不在线", nDid);
return false;
}
Map<Integer, String> modelMap = new HashMap<>();
redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())), 1, false);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("线程休眠被中断: {}", e.getMessage());
return false;
}
List<CsModelDto> modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid), CsModelDto.class);
if (CollUtil.isEmpty(modelId)) {
log.warn("设备 {} 未获取到模板信息", nDid);
return false;
}
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
for (CsModelDto item : modelId) {
CsDevModelRelationPO po = new CsDevModelRelationPO();
po.setDevId(vo.getId());
po.setModelId(item.getModelId());
po.setDid(item.getDid());
po.setUpdateTime(LocalDateTime.now());
csDevModelRelationService.addRelation(po);
modelMap.put(item.getType(), item.getModelId());
}
List<CsLinePO> lineList;
Object object = redisUtil.getObjectByKey("accessLineInfo:" + nDid);
if (Objects.isNull(object)) {
lineList = csLineFeignClient.findByNdid(nDid).getData();
for (CsLinePO item : lineList) {
if (item.getClDid() == 0) {
updateLineIds(modelMap.get(0), item.getClDid(), nDid);
} else {
updateLineIds(modelMap.get(1), item.getClDid(), nDid);
}
}
}
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())), 1, false);
redisUtil.saveByKeyWithExpire("startFile:" + nDid, null, 60L);
result = true;
} catch (Exception e) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "装置接入失败");
csLogsFeignClient.addUserLog(logDto);
log.error("设备 {} 接入失败: {}", nDid, e.getMessage());
}
return result;
}
/** /**
* 组装报文 * 组装报文
*/ */