From 72351c612b86d9e43bbf8d5188c4f799cf67805b Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Wed, 20 Aug 2025 17:54:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/enums/AccessResponseEnum.java | 2 + .../njcn/access/runner/AutoAccessTimer.java | 233 +++++++++++++----- .../service/impl/CsDevModelServiceImpl.java | 22 +- .../service/impl/CsDeviceServiceImpl.java | 95 ++++++- 4 files changed, 278 insertions(+), 74 deletions(-) diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index bbbd639..b134855 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -73,6 +73,8 @@ public enum AccessResponseEnum { PROCESS_ERROR("A0311","调试流程异常,请先进行功能调试、出厂调试!"), FILE_CHECK_ERROR("A0312","文件校验码不一致!"), + + CLD_MODEL_EXIST("A0313","云前置模板已存在,请先删除再录入!"), ; private final String code; diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java index f004acd..dda1777 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java @@ -38,77 +38,198 @@ public class AutoAccessTimer implements ApplicationRunner { @Override public void run(ApplicationArguments args) { + if (scheduler.isShutdown() || scheduler.isTerminated()) { + scheduler = Executors.newScheduledThreadPool(1); + } Runnable task = () -> { - log.info("轮询定时任务执行中!"); - List list = csEquipmentDeliveryService.getOfflineDev(); - if (CollUtil.isNotEmpty(list)) { - ExecutorService executor = Executors.newFixedThreadPool(10); - // 将任务平均分配给10个子列表 + try { + executeScheduledTask(); + } + // 捕获所有Throwable,包括Error + catch (Throwable t) { + log.error("定时任务发生严重异常,尝试恢复", t); + // 可以添加重启逻辑或告警 + } + }; + ScheduledFuture future = scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS); + // 添加监控,如果任务被取消则重新调度 + monitorScheduledTask(future); + } + + //10分钟检查一下调度任务 + private void monitorScheduledTask(ScheduledFuture future) { + Thread monitorThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + //每10分钟检查一次 + Thread.sleep(600000); + if (future.isCancelled() || future.isDone()) { + log.warn("定时任务被取消或完成,重新调度..."); + // 重新启动任务 + run(null); + break; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("监控线程被中断"); + break; + } catch (Exception e) { + log.error("监控任务异常", e); + } + } + }, "Schedule-Monitor-Thread"); + + monitorThread.setDaemon(true); + monitorThread.start(); + } + + private void executeScheduledTask() { + log.info("轮询定时任务执行中!"); + List list = csEquipmentDeliveryService.getOfflineDev(); + if (CollUtil.isNotEmpty(list)) { + ExecutorService executor = Executors.newFixedThreadPool(10); + try { List> subLists = CollUtil.split(list, 10); -// List> subLists = new ArrayList<>(); -// int partitionSize = list.size() / 10; -// for (int i = 0; i < 10; i++) { -// int start = i * partitionSize; -// int end = (i == 9) ? list.size() : start + partitionSize; -// subLists.add(list.subList(start, end)); -// } - // 创建一个ExecutorService来处理这些任务 List> futures = new ArrayList<>(); - // 提交任务给线程池执行 -// for (int i = 0; i < 10; i++) { -// int index = i; -// futures.add(executor.submit(() -> { -// accessDev(subLists.get(index)); -// return null; -// })); -// } for (List subList : subLists) { futures.add(executor.submit(() -> { - accessDev(subList); + try { + accessDevSafely(subList); // 使用安全版本 + } catch (Exception e) { + log.error("处理设备子列表异常", e); + } return null; })); } - // 等待所有任务完成 for (Future future : futures) { try { - future.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("任务被中断", e); - } catch (ExecutionException e) { - log.error("任务执行异常", e.getCause()); + future.get(5, TimeUnit.MINUTES); + } catch (TimeoutException e) { + log.error("任务执行超时", e); + } catch (Exception e) { + log.error("任务执行异常", e); } } - // 关闭ExecutorService + } finally { executor.shutdown(); - } - }; - //第一次执行的时间为120s,然后在前一个任务执行完毕后,等待120s再执行下一个任务 - scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS); - } - - public void accessDev(List list) { - if (CollUtil.isNotEmpty(list)) { - try { - list.forEach(item -> { - System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); - //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 - String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); - if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) { - //csDeviceService.wlDevRegister(item.getNdid()); - log.info("请先手动注册、接入"); - } else { - String version = csTopicService.getVersion(item.getNdid()); - if (Objects.isNull(version)) { - version = "V1"; - } - csDeviceService.autoAccess(item.getNdid(), version, 1); + try { + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + executor.shutdownNow(); } - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); - }); - } catch (Exception e) { - log.error(e.getMessage()); + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } } } } + + //安全的accessDev版本 + private void accessDevSafely(List list) { + if (CollUtil.isNotEmpty(list)) { + for (CsEquipmentDeliveryPO item : list) { + try { + processSingleDevice(item); + } catch (Exception e) { + log.error("处理设备 {} 失败: {}", item.getNdid(), e.getMessage()); + } + } + } + } + + private void processSingleDevice(CsEquipmentDeliveryPO item) { + System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); + String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); + if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) { + log.info("设备 {} 需要手动注册、接入", item.getNdid()); + } else { + String version = csTopicService.getVersion(item.getNdid()); + if (Objects.isNull(version)) { + version = "V1"; + } + // 使用try-catch确保单个设备失败不影响其他设备 + try { + boolean success = csDeviceService.autoAccess2(item.getNdid(), version, 1); + if (success) { + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); + } else { + log.warn("设备 {} 接入失败", item.getNdid()); + } + } catch (Exception e) { + log.error("设备 {} 接入异常: {}", item.getNdid(), e.getMessage()); + } + } + } + +// @Override +// public void run(ApplicationArguments args) { +// if (scheduler.isShutdown() || scheduler.isTerminated()) { +// scheduler = Executors.newScheduledThreadPool(1); +// } +// Runnable task = () -> { +// log.info("轮询定时任务执行中!"); +// List list = csEquipmentDeliveryService.getOfflineDev(); +// if (CollUtil.isNotEmpty(list)) { +// ExecutorService executor = Executors.newFixedThreadPool(10); +// // 将任务平均分配给10个子列表 +// List> subLists = CollUtil.split(list, 10); +// // 创建一个ExecutorService来处理这些任务 +// List> futures = new ArrayList<>(); +// for (List subList : subLists) { +// futures.add(executor.submit(() -> { +// try { +// accessDev(subList); +// } catch (Exception e) { +// log.error("处理设备子列表异常,但继续处理其他任务", e); +// } +// return null; +// })); +// } +// // 等待所有任务完成 +// for (Future future : futures) { +// try { +// future.get(); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// log.error("任务被中断", e); +// } catch (ExecutionException e) { +// log.error("任务执行异常", e.getCause()); +// } catch (Exception e) { +// log.error("系统异常", e.getCause()); +// } +// } +// // 关闭ExecutorService +// executor.shutdown(); +// } +// }; +// //第一次执行的时间为120s,然后在前一个任务执行完毕后,等待120s再执行下一个任务 +//// scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS); +// scheduler.scheduleWithFixedDelay(task, 10, 10, TimeUnit.SECONDS); +// } +// +// public void accessDev(List list) { +// if (CollUtil.isNotEmpty(list)) { +// try { +// list.forEach(item -> { +// System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); +// //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 +// String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); +// if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) { +// //csDeviceService.wlDevRegister(item.getNdid()); +// log.info("请先手动注册、接入"); +// } else { +// String version = csTopicService.getVersion(item.getNdid()); +// if (Objects.isNull(version)) { +// version = "V1"; +// } +// csDeviceService.autoAccess(item.getNdid(), version, 1); +// } +// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); +// }); +// } catch (Exception e) { +// log.error(e.getMessage()); +// } +// } +// } + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index 5f32d14..a3ed28f 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -3,6 +3,7 @@ package com.njcn.access.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessResponseEnum; @@ -96,6 +97,14 @@ public class CsDevModelServiceImpl implements ICsDevModelService { DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData(); if (Objects.isNull(dictTreeVO)){ throw new BusinessException(AccessResponseEnum.DEV_TYPE_NOT_FIND); + } else if (Objects.equals(devType,"CLD")) { + //查询是否已存在云前置模板 + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(CsDevModelPO::getName,"CLD").eq(CsDevModelPO::getStatus,1); + List list = csDevModelMapper.selectList(lambdaQueryWrapper); + if (CollectionUtil.isNotEmpty(list)) { + throw new BusinessException(AccessResponseEnum.CLD_MODEL_EXIST); + } } logDto.setOperate("新增设备模板,模板名称:" + templateDto.getDevType()); //模板文件存入文件服务器 @@ -373,9 +382,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } eleEpdPqdParam.setClassId(classId); if (!Objects.isNull(apf.getHarmStart())){ - if (Objects.equals(apf.getHarmStart(),0.5) && Objects.equals(apf.getHarmEnd(),49.5)){ + if (Objects.equals(apf.getHarmStart(),0.5)){ eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()+0.5)); - eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()+49.5)); + eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()+0.5)); } else { eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()*1.0)); eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()*1.0)); @@ -612,9 +621,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } eleEpdPqdParam.setUnit(epd.getUnit()); if (!Objects.isNull(epd.getHarmStart())){ - if (Objects.equals(epd.getHarmStart(),0.5) && Objects.equals(epd.getHarmEnd(),49.5)){ + if (Objects.equals(epd.getHarmStart(),0.5)){ eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()+0.5)); - eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()+49.5)); + eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()+0.5)); } else { eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()*1.0)); eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()*1.0)); @@ -648,14 +657,15 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } eleEpdPqdParam.setUnit(pqd.getUnit()); if (!Objects.isNull(pqd.getHarmStart())){ - if (Objects.equals(pqd.getHarmStart(),0.5) && Objects.equals(pqd.getHarmEnd(),49.5)){ + if (Objects.equals(pqd.getHarmStart(),0.5)){ eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()+0.5)); - eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()+49.5)); + eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()+0.5)); } else { eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()*1.0)); eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()*1.0)); } } + eleEpdPqdParam.setStatMethod(pqd.getStatMethod()); eleEpdPqdParam.setDataType(id); eleEpdPqdParam.setClassId(classId); result.add(eleEpdPqdParam); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 0f77c12..20da7c3 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -604,20 +604,20 @@ public class CsDeviceServiceImpl implements ICsDeviceService { */ @Transactional(rollbackFor = Exception.class) public boolean autoAccess(String nDid,String version,Integer mid) { - String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); - boolean mqttClient = mqttUtil.judgeClientOnline(clientName); - if (!mqttClient) { - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - logDto.setResult(1); - logDto.setOperate(nDid + "接入失败,装置客户端不在线"); - csLogsFeignClient.addUserLog(logDto); - throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); - } boolean result = false; - Map modelMap = new HashMap<>(); try { + String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + if (!mqttClient) { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + logDto.setResult(1); + logDto.setOperate(nDid + "接入失败,装置客户端不在线"); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); + } + Map modelMap = new HashMap<>(); //删除缓存数据 redisUtil.delete(AppRedisKey.MODEL + nDid); redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); @@ -669,6 +669,77 @@ public class CsDeviceServiceImpl implements ICsDeviceService { return result; } + @Transactional(rollbackFor = Exception.class) + public boolean autoAccess2(String nDid, String version, Integer mid) { + boolean result = false; + try { + String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + if (!mqttClient) { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + logDto.setResult(1); + logDto.setOperate(nDid + "接入失败,装置客户端不在线"); + csLogsFeignClient.addUserLog(logDto); + // 改为返回false而不是抛出异常 + log.warn("设备 {} 客户端不在线", nDid); + return false; + } + + Map modelMap = new HashMap<>(); + redisUtil.delete(AppRedisKey.MODEL + nDid); + redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())), 1, false); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("线程休眠被中断: {}", e.getMessage()); + return false; + } + List modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid), CsModelDto.class); + if (CollUtil.isEmpty(modelId)) { + log.warn("设备 {} 未获取到模板信息", nDid); + return false; + } + CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); + for (CsModelDto item : modelId) { + CsDevModelRelationPO po = new CsDevModelRelationPO(); + po.setDevId(vo.getId()); + po.setModelId(item.getModelId()); + po.setDid(item.getDid()); + po.setUpdateTime(LocalDateTime.now()); + csDevModelRelationService.addRelation(po); + modelMap.put(item.getType(), item.getModelId()); + } + List lineList; + Object object = redisUtil.getObjectByKey("accessLineInfo:" + nDid); + if (Objects.isNull(object)) { + lineList = csLineFeignClient.findByNdid(nDid).getData(); + for (CsLinePO item : lineList) { + if (item.getClDid() == 0) { + updateLineIds(modelMap.get(0), item.getClDid(), nDid); + } else { + updateLineIds(modelMap.get(1), item.getClDid(), nDid); + } + } + } + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())), 1, false); + redisUtil.saveByKeyWithExpire("startFile:" + nDid, null, 60L); + result = true; + } catch (Exception e) { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + logDto.setResult(1); + logDto.setOperate(nDid + "装置接入失败"); + csLogsFeignClient.addUserLog(logDto); + log.error("设备 {} 接入失败: {}", nDid, e.getMessage()); + } + return result; + } + /** * 组装报文 */