From c4db053243a30cfc13ef7d179b70dd5d00d6763b Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Thu, 18 Sep 2025 14:58:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BA=91=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/RedisKeyExpirationListener.java | 13 +- .../com/njcn/access/runner/CldHeartTimer.java | 143 ++++++++++++++++++ .../service/ICsEquipmentDeliveryService.java | 5 + .../service/impl/CsDevModelServiceImpl.java | 67 ++++---- .../impl/CsEquipmentDeliveryServiceImpl.java | 23 ++- 5 files changed, 207 insertions(+), 44 deletions(-) create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/runner/CldHeartTimer.java diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 0919124..54a142f 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -19,7 +19,6 @@ import com.njcn.csdevice.pojo.dto.DevDetailDTO; import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.redis.pojo.enums.AppRedisKey; -import com.njcn.redis.pojo.enums.RedisKeyEnum; import com.njcn.redis.utils.RedisUtil; import com.njcn.rt.pojo.dto.BaseRealDataSet; import com.njcn.user.api.AppUserFeignClient; @@ -122,12 +121,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene } } //云前置设备心跳丢失处理 - if(expiredKey.startsWith(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey())){ - String node = expiredKey.split(":")[1]; - String nodeId = node.substring(0, node.length() - 1); - int processNo = Integer.parseInt(node.substring(node.length() - 1)); - equipmentFeignClient.updateCldDevStatus(nodeId,processNo); - } +// if(expiredKey.startsWith(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey())){ +// String node = expiredKey.split(":")[1]; +// String nodeId = node.substring(0, node.length() - 1); +// int processNo = Integer.parseInt(node.substring(node.length() - 1)); +// equipmentFeignClient.updateCldDevStatus(nodeId,processNo); +// } } //主任务 diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/CldHeartTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/CldHeartTimer.java new file mode 100644 index 0000000..54a65b5 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/CldHeartTimer.java @@ -0,0 +1,143 @@ +package com.njcn.access.runner; + +import cn.hutool.core.collection.CollUtil; +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.*; + +/** + * @author xy + * 定时轮询离线设备接入 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class CldHeartTimer implements ApplicationRunner { + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private static final long OUT_TIME = 120L; + private final RedisUtil redisUtil; + private final ICsEquipmentDeliveryService csEquipmentDeliveryService; + private final EquipmentFeignClient equipmentFeignClient; + + @Override + public void run(ApplicationArguments args) { + if (scheduler.isShutdown() || scheduler.isTerminated()) { + scheduler = Executors.newScheduledThreadPool(1); + } + Runnable task = () -> { + try { + executeScheduledTask(); + } + // 捕获所有Throwable,包括Error + catch (Throwable t) { + log.error("定时任务发生严重异常,尝试恢复", t); + // 可以添加重启逻辑或告警 + } + }; + ScheduledFuture future = scheduler.scheduleWithFixedDelay(task, OUT_TIME, OUT_TIME, TimeUnit.SECONDS); + // 添加监控,如果任务被取消则重新调度 + monitorScheduledTask(future); + } + + //10分钟检查一下调度任务 + private void monitorScheduledTask(ScheduledFuture future) { + Thread monitorThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + //每10分钟检查一次 + Thread.sleep(600000); + if (future.isCancelled() || future.isDone()) { + log.warn("定时任务被取消或完成,重新调度..."); + // 重新启动任务 + run(null); + break; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("监控线程被中断"); + break; + } catch (Exception e) { + log.error("监控任务异常", e); + } + } + }, "Schedule-Monitor-Thread"); + monitorThread.setDaemon(true); + monitorThread.start(); + } + + private void executeScheduledTask() { + log.info("定时检查云前置心跳!"); + //获取在运设备的所有前置和进程号,循环查询redis里面是否存在,不存在则将所有前置下的设备翻转 + List list = csEquipmentDeliveryService.getFrontAndProcess(); + if (CollUtil.isNotEmpty(list)) { + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + List> subLists = CollUtil.split(list, 2); + List> futures = new ArrayList<>(); + for (List subList : subLists) { + futures.add(executor.submit(() -> { + try { + accessDevSafely(subList); + } catch (Exception e) { + log.error("处理设备子列表异常", e); + } + return null; + })); + } + for (Future future : futures) { + try { + future.get(5, TimeUnit.MINUTES); + } catch (TimeoutException e) { + log.error("任务执行超时", e); + } catch (Exception e) { + log.error("任务执行异常", e); + } + } + } finally { + executor.shutdown(); + try { + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + } + + //安全的accessDev版本 + private void accessDevSafely(List list) { + if (CollUtil.isNotEmpty(list)) { + for (String item : list) { + try { + processSingleDevice(item); + } catch (Exception e) { + log.error("处理设备 {} 失败: {}", item, e.getMessage()); + } + } + } + } + + private void processSingleDevice(String item) { + Object object = redisUtil.getObjectByKey(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey() + item); + if (Objects.isNull(object)) { + String nodeId = item.substring(0, item.length() - 1); + int processNo = Integer.parseInt(item.substring(item.length() - 1)); + equipmentFeignClient.updateCldDevStatus(nodeId,processNo); + } + } + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java index 5e4acab..8bf5a7b 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java @@ -69,4 +69,9 @@ public interface ICsEquipmentDeliveryService extends IService getOfflineDev(); + + /** + * 获取在运且在线的装置 所属前置和进程号 + */ + List getFrontAndProcess(); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index 17e2e95..6084d34 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -97,10 +97,10 @@ public class CsDevModelServiceImpl implements ICsDevModelService { DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData(); if (Objects.isNull(dictTreeVO)){ throw new BusinessException(AccessResponseEnum.DEV_TYPE_NOT_FIND); - } else if (Objects.equals(devType,"CLD")) { + } else if (Objects.equals(devType,DicDataEnum.DEV_CLD.getCode())) { //查询是否已存在云前置模板 LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(CsDevModelPO::getName,"CLD").eq(CsDevModelPO::getStatus,1); + lambdaQueryWrapper.eq(CsDevModelPO::getName,DicDataEnum.DEV_CLD.getCode()).eq(CsDevModelPO::getStatus,1); List list = csDevModelMapper.selectList(lambdaQueryWrapper); if (CollectionUtil.isNotEmpty(list)) { throw new BusinessException(AccessResponseEnum.CLD_MODEL_EXIST); @@ -946,37 +946,38 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } if(CollectionUtil.isNotEmpty(arrayList)) { csDataArrayService.addList(arrayList); - List ls = new ArrayList<>(); - List groArrList = new ArrayList<>(); - Map> setMap = arrayList.stream().collect(Collectors.groupingBy(CsDataArray::getPid,LinkedHashMap::new,Collectors.toList())); - setMap.forEach((k0,v0)->{ - AtomicReference sort = new AtomicReference<>(0); - Map> map = v0.stream().filter(a-> "avg".equals(a.getStatMethod()) || Objects.isNull(a.getStatMethod())).collect(Collectors.groupingBy(CsDataArray::getAnotherName,LinkedHashMap::new,Collectors.toList())); - map.forEach((k,v)->{ - //录入组数据 - String groupId = IdUtil.simpleUUID(); - CsGroup csGroup = new CsGroup(); - csGroup.setId(groupId); - csGroup.setDataSetId(k0); - csGroup.setGroupName(k); - csGroup.setSort(sort.getAndSet(sort.get() + 1)); - csGroup.setIsShow(1); - ls.add(csGroup); - //录入组和指标关系 - v.forEach(item->{ - CsGroArr csGroArr = new CsGroArr(); - csGroArr.setGroupId(groupId); - csGroArr.setArrayId(item.getId()); - groArrList.add(csGroArr); - }); - }); - }); - if(CollectionUtil.isNotEmpty(ls)) { - csGroupService.addList(ls); - } - if(CollectionUtil.isNotEmpty(groArrList)) { - csGroArrService.addGroArrList(groArrList); - } + //物联这边没有分组的要求,这部分代码先注释,用能那边用到这个代码 +// List ls = new ArrayList<>(); +// List groArrList = new ArrayList<>(); +// Map> setMap = arrayList.stream().collect(Collectors.groupingBy(CsDataArray::getPid,LinkedHashMap::new,Collectors.toList())); +// setMap.forEach((k0,v0)->{ +// AtomicReference sort = new AtomicReference<>(0); +// Map> map = v0.stream().filter(a-> "avg".equals(a.getStatMethod()) || Objects.isNull(a.getStatMethod())).collect(Collectors.groupingBy(CsDataArray::getAnotherName,LinkedHashMap::new,Collectors.toList())); +// map.forEach((k,v)->{ +// //录入组数据 +// String groupId = IdUtil.simpleUUID(); +// CsGroup csGroup = new CsGroup(); +// csGroup.setId(groupId); +// csGroup.setDataSetId(k0); +// csGroup.setGroupName(k); +// csGroup.setSort(sort.getAndSet(sort.get() + 1)); +// csGroup.setIsShow(1); +// ls.add(csGroup); +// //录入组和指标关系 +// v.forEach(item->{ +// CsGroArr csGroArr = new CsGroArr(); +// csGroArr.setGroupId(groupId); +// csGroArr.setArrayId(item.getId()); +// groArrList.add(csGroArr); +// }); +// }); +// }); +// if(CollectionUtil.isNotEmpty(ls)) { +// csGroupService.addList(ls); +// } +// if(CollectionUtil.isNotEmpty(groArrList)) { +// csGroArrService.addGroArrList(groArrList); +// } } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index b1c944c..28c68d7 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -1,6 +1,8 @@ package com.njcn.access.service.impl; import cn.hutool.core.collection.CollUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.access.enums.AccessEnum; @@ -16,10 +18,8 @@ import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; +import java.util.*; +import java.util.stream.Collectors; /** * @@ -125,6 +125,7 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl{ @@ -145,4 +146,18 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl getFrontAndProcess() { + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated"); + wrapper.eq("usage_status", 1); + wrapper.eq("run_status", 2); + wrapper.isNotNull("node_id"); + return baseMapper.selectObjs(wrapper) + .stream() + .map(obj -> (String) obj) + .distinct() + .collect(Collectors.toList()); + } + }