From 04fd2409cf20fb781c435f63a64befc49543ea15 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Fri, 27 Dec 2024 09:51:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/RedisKeyExpirationListener.java | 136 ++++++++++-------- 1 file changed, 75 insertions(+), 61 deletions(-) diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 6fb8edc..6d58a3b 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -33,7 +33,6 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Objects; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -98,78 +97,93 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene String expiredKey = message.toString(); if(expiredKey.startsWith("MQTT:")){ String nDid = expiredKey.split(":")[1]; - String version = csTopicService.getVersion(nDid); - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - executeMainTask(scheduler,nDid,version); + executeMainTask(nDid); } } //主任务 //1.装置心跳断连 //2.MQTT客户端不在线 - private void executeMainTask(ScheduledExecutorService scheduler, String nDid, String version) { + private void executeMainTask(String nDid) { log.info("{}->装置离线", nDid); DeviceLogDTO logDto = new DeviceLogDTO(); logDto.setUserName("运维管理员"); logDto.setLoginName("njcnyw"); - //判断mqtt - String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); - boolean mqttClient = mqttUtil.judgeClientOnline(clientName); - //心跳异常,但是客户端在线,则发送接入请求 - if (mqttClient) { - csDeviceService.devAccessAskTemplate(nDid,version,1); - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); - try { - Thread.sleep(5000); - Object object = redisUtil.getObjectByKey("online" + nDid); - if (Objects.nonNull(object)) { - scheduler.shutdown(); - logDto.setOperate(nDid + "客户端在线重连成功"); - } else { - //装置下线 - csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); - //装置调整为注册状态 - csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); - //startScheduledTask(scheduler,nDid,version); - //logDto.setOperate(nDid +"客户端离线进入定时任务"); - logDto.setOperate(nDid +"装置离线"); - sendMessage(nDid); - - //记录装置掉线时间 - CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); - record.setOfflineTime(LocalDateTime.now()); - onlineLogsService.updateById(record); - - scheduler.shutdown(); - } - } catch (InterruptedException e) { - scheduler.shutdown(); - throw new RuntimeException(e); - } - csLogsFeignClient.addUserLog(logDto); - } - //客户端不在线则修改装置状态,进入定时任务 - else { - //装置下线 - csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); - //装置调整为注册状态 - csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); - logDto.setOperate(nDid +"主任务执行失败,装置下线"); - //logDto.setOperate(nDid +"主任务执行失败,装置下线,进入定时任务"); - csLogsFeignClient.addUserLog(logDto); - //log.info("客户端离线进入定时任务..."); - //startScheduledTask(scheduler,nDid,version); - sendMessage(nDid); - - //记录装置掉线时间 - CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); - record.setOfflineTime(LocalDateTime.now()); - onlineLogsService.updateById(record); - - scheduler.shutdown(); - } + //装置下线 + csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); + //装置调整为注册状态 + csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); + logDto.setOperate(nDid +"装置离线"); + sendMessage(nDid); + //记录装置掉线时间 + CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); + record.setOfflineTime(LocalDateTime.now()); + onlineLogsService.updateById(record); + csLogsFeignClient.addUserLog(logDto); } + //主任务 + //1.装置心跳断连 + //2.MQTT客户端不在线 +// private void executeMainTask(String nDid, String version) { +// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); +// log.info("{}->装置离线", nDid); +// DeviceLogDTO logDto = new DeviceLogDTO(); +// logDto.setUserName("运维管理员"); +// logDto.setLoginName("njcnyw"); +// //判断mqtt +// String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); +// boolean mqttClient = mqttUtil.judgeClientOnline(clientName); +// //心跳异常,但是客户端在线,则发送接入请求 +// if (mqttClient) { +// csDeviceService.devAccessAskTemplate(nDid,version,1); +// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); +// try { +// Thread.sleep(5000); +// Object object = redisUtil.getObjectByKey("online" + nDid); +// if (Objects.nonNull(object)) { +// scheduler.shutdown(); +// logDto.setOperate(nDid + "客户端在线重连成功"); +// } else { +// //装置下线 +// csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); +// //装置调整为注册状态 +// csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); +// logDto.setOperate(nDid +"装置离线"); +// sendMessage(nDid); +// +// //记录装置掉线时间 +// CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); +// record.setOfflineTime(LocalDateTime.now()); +// onlineLogsService.updateById(record); +// +// scheduler.shutdown(); +// } +// } catch (InterruptedException e) { +// scheduler.shutdown(); +// throw new RuntimeException(e); +// } +// csLogsFeignClient.addUserLog(logDto); +// } +// //客户端不在线则修改装置状态,进入定时任务 +// else { +// //装置下线 +// csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); +// //装置调整为注册状态 +// csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); +// logDto.setOperate(nDid +"主任务执行失败,装置下线"); +// csLogsFeignClient.addUserLog(logDto); +// sendMessage(nDid); +// +// //记录装置掉线时间 +// CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); +// record.setOfflineTime(LocalDateTime.now()); +// onlineLogsService.updateById(record); +// +// scheduler.shutdown(); +// } +// } + private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) { synchronized (lock) { //判断是否推送消息