diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 9f9e4a2..0564b52 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -74,6 +74,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private RedisUtil redisUtil; @Resource private SendMessageUtil sendMessageUtil; + @Resource + private CsDeviceServiceImpl csDeviceServiceImpl; private final Object lock = new Object(); public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { @@ -106,7 +108,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene //1.装置心跳断连 //2.MQTT客户端不在线 private void executeMainTask(ScheduledExecutorService scheduler, String nDid, String version) { - log.info("正在执行主任务..."); + log.info("{}->装置离线", nDid); DeviceLogDTO logDto = new DeviceLogDTO(); logDto.setUserName("运维管理员"); logDto.setLoginName("njcnyw"); @@ -126,10 +128,16 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene } else { //装置下线 csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); - startScheduledTask(scheduler,nDid,version); - logDto.setOperate(nDid +"客户端离线进入定时任务"); + //装置调整为注册状态 + csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); + //startScheduledTask(scheduler,nDid,version); + //logDto.setOperate(nDid +"客户端离线进入定时任务"); + logDto.setOperate(nDid +"装置离线"); + sendMessage(nDid); + scheduler.shutdown(); } } catch (InterruptedException e) { + scheduler.shutdown(); throw new RuntimeException(e); } csLogsFeignClient.addUserLog(logDto); @@ -138,10 +146,15 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene else { //装置下线 csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); - logDto.setOperate(nDid +"主任务执行失败,装置下线,进入定时任务"); + //装置调整为注册状态 + csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); + logDto.setOperate(nDid +"主任务执行失败,装置下线"); + //logDto.setOperate(nDid +"主任务执行失败,装置下线,进入定时任务"); csLogsFeignClient.addUserLog(logDto); - log.info("客户端离线进入定时任务..."); - startScheduledTask(scheduler,nDid,version); + //log.info("客户端离线进入定时任务..."); + //startScheduledTask(scheduler,nDid,version); + sendMessage(nDid); + scheduler.shutdown(); } } @@ -207,6 +220,16 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene } } + //判断设备型号发送数据 + private void sendMessage(String nDid) { + boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData(); + if (devModel) { + NoticeUserDto dto = sendOffLine(nDid); + sendMessageUtil.sendEventToUser(dto); + addLogs(dto); + } + } + //掉线通知 private NoticeUserDto sendOffLine(String nDid) { NoticeUserDto dto = new NoticeUserDto(); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java index 43f9e32..e674aa0 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java @@ -1,22 +1,24 @@ package com.njcn.access.runner; +import cn.hutool.core.collection.CollUtil; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.DictTreeFeignClient; +import com.njcn.system.enums.DicTreeEnum; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; -import javax.annotation.Resource; +import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入 @@ -27,34 +29,79 @@ import java.util.concurrent.TimeUnit; */ @Component @Slf4j +@RequiredArgsConstructor public class AccessApplicationRunner implements ApplicationRunner { - @Resource - private RedisUtil redisUtil; - @Resource - private ICsEquipmentDeliveryService csEquipmentDeliveryService; - @Resource - private ICsTopicService csTopicService; - @Resource - private CsDeviceServiceImpl csDeviceService; + private final RedisUtil redisUtil; + private final ICsEquipmentDeliveryService csEquipmentDeliveryService; + private final ICsTopicService csTopicService; + private final CsDeviceServiceImpl csDeviceService; + private final DictTreeFeignClient dictTreeFeignClient; + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + ExecutorService executor = Executors.newFixedThreadPool(10); private static final long ACCESS_TIME = 60L; @Override public void run(ApplicationArguments args) { //项目启动60s后发起自动接入 Runnable task = () -> { - List list = csEquipmentDeliveryService.getAll(); - list.forEach(item->{ - String version = csTopicService.getVersion(item.getNdid()); - if (!Objects.isNull(version)){ - csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); + log.info("系统重启,所有符合条件的装置全部接入!"); + List list = csEquipmentDeliveryService.getOnlineDev(); + if (CollUtil.isNotEmpty(list)) { + // 将任务平均分配给10个子列表 + List> subLists = new ArrayList<>(); + int partitionSize = list.size() / 10; + for (int i = 0; i < 10; i++) { + int start = i * partitionSize; + int end = (i == 9) ? list.size() : start + partitionSize; + subLists.add(list.subList(start, end)); } - }); + + // 创建一个ExecutorService来处理这些任务 + List> futures = new ArrayList<>(); + // 提交任务给线程池执行 + for (int i = 0; i < 10; i++) { + int index = i; + futures.add(executor.submit(new Callable() { + @Override + public Void call() { + accessDev(subLists.get(index)); + return null; + } + })); + } + // 等待所有任务完成 + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + // 关闭ExecutorService + executor.shutdown(); + scheduler.shutdown(); + } }; - scheduler.schedule(task, ACCESS_TIME, TimeUnit.SECONDS); - // 关闭调度程序 - scheduler.shutdown(); + scheduler.schedule(task,ACCESS_TIME,TimeUnit.SECONDS); } + + public void accessDev(List list) { + list.forEach(item->{ + System.out.println(Thread.currentThread().getName() + ": reboot : nDid : " + item.getNdid()); + String version = csTopicService.getVersion(item.getNdid()); + if (!Objects.isNull(version)){ + //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 + String code = dictTreeFeignClient.queryById(item.getDevModel()).getData().getCode(); + if (Objects.equals(code,DicTreeEnum.PQV_520.getCode()) && Objects.equals(item.getStatus(),1)) { + csDeviceService.wlDevRegister(item.getNdid()); + } else { + csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); + } + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); + } + }); + } + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java new file mode 100644 index 0000000..ff03c8b --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java @@ -0,0 +1,100 @@ +package com.njcn.access.runner; + +import cn.hutool.core.collection.CollUtil; +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.access.service.ICsTopicService; +import com.njcn.access.service.impl.CsDeviceServiceImpl; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.DictTreeFeignClient; +import com.njcn.system.enums.DicTreeEnum; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.*; + +/** + * @author xy + * 定时轮询离线设备接入 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class AutoAccessTimer implements ApplicationRunner { + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + ExecutorService executor = Executors.newFixedThreadPool(10); + private static final long AUTO_TIME = 120L; + private final RedisUtil redisUtil; + private final ICsEquipmentDeliveryService csEquipmentDeliveryService; + private final ICsTopicService csTopicService; + private final CsDeviceServiceImpl csDeviceService; + private final DictTreeFeignClient dictTreeFeignClient; + + @Override + public void run(ApplicationArguments args) { + Runnable task = () -> { + log.info("轮询定时任务执行中!"); + List list = csEquipmentDeliveryService.getOfflineDev(); + if (CollUtil.isNotEmpty(list)) { + // 将任务平均分配给10个子列表 + List> subLists = new ArrayList<>(); + int partitionSize = list.size() / 10; + for (int i = 0; i < 10; i++) { + int start = i * partitionSize; + int end = (i == 9) ? list.size() : start + partitionSize; + subLists.add(list.subList(start, end)); + } + + // 创建一个ExecutorService来处理这些任务 + List> futures = new ArrayList<>(); + // 提交任务给线程池执行 + for (int i = 0; i < 10; i++) { + int index = i; + futures.add(executor.submit(new Callable() { + @Override + public Void call() { + accessDev(subLists.get(index)); + return null; + } + })); + } + // 等待所有任务完成 + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + // 关闭ExecutorService + executor.shutdown(); + } + }; + //第一次执行的时间为120s,然后每隔120s执行一次 + scheduler.scheduleAtFixedRate(task,AUTO_TIME,AUTO_TIME,TimeUnit.SECONDS); + } + + public void accessDev(List list) { + list.forEach(item->{ + System.out.println(Thread.currentThread().getName() + ": reboot : nDid : " + item.getNdid()); + String version = csTopicService.getVersion(item.getNdid()); + if (!Objects.isNull(version)){ + //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 + String code = dictTreeFeignClient.queryById(item.getDevModel()).getData().getCode(); + if (Objects.equals(code, DicTreeEnum.PQV_520.getCode()) && Objects.equals(item.getStatus(),1)) { + csDeviceService.wlDevRegister(item.getNdid()); + } else { + csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); + } + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); + } + }); + } +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java index a77eab2..5e4acab 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java @@ -59,4 +59,14 @@ public interface ICsEquipmentDeliveryService extends IService getOnlineDev(); + + /** + * 获取离线、启用、客户端在线的装置 + */ + List getOfflineDev(); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index bb3cbf7..cc42229 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -21,6 +21,7 @@ import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.*; +import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.pojo.param.CsDevModelRelationAddParm; import com.njcn.csdevice.pojo.param.CsLedgerParam; import com.njcn.csdevice.pojo.param.CsLineParam; @@ -538,6 +539,17 @@ public class CsDeviceServiceImpl implements ICsDeviceService { */ @Transactional(rollbackFor = Exception.class) public boolean devAccessAskTemplate(String nDid,String version,Integer mid) { + String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + if (!mqttClient) { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + logDto.setResult(1); + logDto.setOperate(nDid + "接入失败,装置客户端不在线"); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); + } boolean result = false; Map modelMap = new HashMap<>(); try { @@ -580,8 +592,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService { redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L); result = true; } - } catch (InterruptedException e) { - throw new RuntimeException(e); + } catch (Exception e) { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + logDto.setResult(1); + logDto.setOperate(nDid + "装置接入失败"); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(e.getMessage()); } return result; } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index bde0ae7..17711ec 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -2,15 +2,20 @@ package com.njcn.access.service.impl; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.access.enums.AccessEnum; import com.njcn.access.mapper.CsEquipmentDeliveryMapper; import com.njcn.access.pojo.param.DeviceStatusParam; import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.access.utils.MqttUtil; +import com.njcn.common.pojo.dto.DeviceLogDTO; +import com.njcn.csdevice.api.CsLogsFeignClient; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -27,6 +32,10 @@ import java.util.Objects; @RequiredArgsConstructor public class CsEquipmentDeliveryServiceImpl extends ServiceImpl implements ICsEquipmentDeliveryService { + private final MqttUtil mqttUtil; + + private final CsLogsFeignClient csLogsFeignClient; + @Override public void updateStatusBynDid(String nDid,Integer status) { LambdaUpdateWrapper lambdaUpdateWrapper = new LambdaUpdateWrapper<>(); @@ -81,4 +90,53 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl getOnlineDev() { + List result = new ArrayList<>(); + List list = this.lambdaQuery() + .ne(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.DEL.getCode()) + .eq(CsEquipmentDeliveryPO::getUsageStatus,1) + .list(); + list.forEach(item->{ + String clientName = "NJCN-" + item.getNdid().substring(item.getNdid().length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + if (mqttClient) { + result.add(item); + } else { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + logDto.setResult(1); + logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线"); + csLogsFeignClient.addUserLog(logDto); + } + }); + return result; + } + + @Override + public List getOfflineDev() { + List result = new ArrayList<>(); + List list = this.lambdaQuery() + .eq(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.OFFLINE.getCode()) + .eq(CsEquipmentDeliveryPO::getUsageStatus,1) + .eq(CsEquipmentDeliveryPO::getStatus, AccessEnum.REGISTERED.getCode()) + .list(); + list.forEach(item->{ + String clientName = "NJCN-" + item.getNdid().substring(item.getNdid().length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + if (mqttClient) { + result.add(item); + } else { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + logDto.setResult(1); + logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线"); + csLogsFeignClient.addUserLog(logDto); + } + }); + return result; + } + } diff --git a/iot-access/access-boot/src/test/java/com/njcn/BatchProcessing.java b/iot-access/access-boot/src/test/java/com/njcn/BatchProcessing.java new file mode 100644 index 0000000..6e5d8d9 --- /dev/null +++ b/iot-access/access-boot/src/test/java/com/njcn/BatchProcessing.java @@ -0,0 +1,43 @@ +package com.njcn; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class BatchProcessing { + + public static void main(String[] args) { + // 创建包含10000条数据的列表 + List dataList = new ArrayList<>(); + for (int i = 1; i <= 20000; i++) { + dataList.add(i); + } + + // 将数据分成10个子列表,每个子列表包含1000条数据 + List> batches = new ArrayList<>(); + int batchSize = 1000; + int batchCount = dataList.size() / batchSize; + for (int i = 0; i < batchCount; i++) { + int fromIndex = i * batchSize; + int toIndex = fromIndex + batchSize; + List batch = dataList.subList(fromIndex, toIndex); + batches.add(batch); + } + + // 使用多线程并发处理每个子列表 + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (List batch : batches) { + executorService.submit(() -> processBatch(batch)); + } + executorService.shutdown(); + } + + private static void processBatch(List batch) { + for (Integer data : batch) { + // 处理数据的逻辑 + System.out.println(Thread.currentThread().getName() + ": processing data " + data); + } + } + +} \ No newline at end of file