接入冗余处理

This commit is contained in:
xy
2024-10-08 09:55:38 +08:00
parent 531a787c91
commit 0e7d12ab93
3 changed files with 28 additions and 28 deletions

View File

@@ -335,6 +335,8 @@ public class MqttMessageHandler {
onlineLogsService.save(csDeviceOnlineLogs); onlineLogsService.save(csDeviceOnlineLogs);
} }
} }
//接入后系统模拟装置心跳
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),300L);
//修改redis的mid //修改redis的mid
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
redisUtil.saveByKeyWithExpire("online" + nDid,"online",10L); redisUtil.saveByKeyWithExpire("online" + nDid,"online",10L);
@@ -643,13 +645,13 @@ public class MqttMessageHandler {
private void saveDirectoryInfo(List<FileDto.DirInfo> dirInfo, String key) { private void saveDirectoryInfo(List<FileDto.DirInfo> dirInfo, String key) {
if (!CollectionUtil.isEmpty(dirInfo)) { if (!CollectionUtil.isEmpty(dirInfo)) {
redisUtil.saveByKeyWithExpire(key, dirInfo, 10L); redisUtil.saveByKeyWithExpire(key, dirInfo, 20L);
} }
} }
private void saveFileInfo(FileDto.FileInfo fileInfo, String key) { private void saveFileInfo(FileDto.FileInfo fileInfo, String key) {
if (!Objects.isNull(fileInfo)) { if (!Objects.isNull(fileInfo)) {
redisUtil.saveByKeyWithExpire(key, fileInfo, 10L); redisUtil.saveByKeyWithExpire(key, fileInfo, 20L);
} }
} }

View File

@@ -2,6 +2,7 @@ package com.njcn.access.listener;
import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsDeviceOnlineLogsService;
import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.ICsTopicService;
@@ -84,8 +85,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
super(listenerContainer); super(listenerContainer);
} }
// 当前尝试次数
private static int attemptCount = 1;
//最大告警次数 //最大告警次数
private static int MAX_WARNING_TIMES = 0; private static int MAX_WARNING_TIMES = 0;
@@ -130,7 +129,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName); boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
//心跳异常,但是客户端在线,则发送接入请求 //心跳异常,但是客户端在线,则发送接入请求
//这边可能存在装置已经掉线,但是客户端仍然在线的情况
if (mqttClient) { if (mqttClient) {
csDeviceService.devAccessAskTemplate(nDid,version,1); csDeviceService.devAccessAskTemplate(nDid,version,1);
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
@@ -145,6 +143,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
startScheduledTask(scheduler,nDid,version); startScheduledTask(scheduler,nDid,version);
logDto.setOperate("客户端离线进入定时任务"); logDto.setOperate("客户端离线进入定时任务");
//记录装置掉线时间
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
record.setOfflineTime(LocalDateTime.now());
onlineLogsService.updateById(record);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -173,21 +175,26 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
boolean mqttClient = mqttUtil.judgeClientOnline(clientName); boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (mqttClient) { if (mqttClient) {
csDeviceService.devAccessAskTemplate(nDid,version,1); csDeviceService.devAccessAskTemplate(nDid,version,1);
Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); try {
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ Thread.sleep(2000);
logDto.setResult(1); Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
scheduler.shutdown(); if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); logDto.setResult(1);
return; scheduler.shutdown();
} else { redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
logDto.setResult(0); return;
//一个小时未连接上,则推送告警消息 } else {
MAX_WARNING_TIMES++; logDto.setResult(0);
if (MAX_WARNING_TIMES == 30) { //一个小时未连接上,则推送告警消息
NoticeUserDto dto2 = sendConnectFail(nDid); MAX_WARNING_TIMES++;
sendEventToUser(dto2); if (MAX_WARNING_TIMES == 30) {
addLogs(dto2); NoticeUserDto dto2 = sendConnectFail(nDid);
sendEventToUser(dto2);
addLogs(dto2);
}
} }
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
} else { } else {
//一个小时未连接上,则推送告警消息 //一个小时未连接上,则推送告警消息
@@ -293,6 +300,4 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
e.getMessage(); e.getMessage();
} }
} }
} }

View File

@@ -11,13 +11,11 @@ import com.njcn.access.pojo.dto.ControlDto;
import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.dto.file.FileRedisDto;
import com.njcn.access.service.AskDeviceDataService; import com.njcn.access.service.AskDeviceDataService;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.zlevent.pojo.dto.FileStreamDto; import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.NoticeUserDto;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -25,11 +23,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* @author xy * @author xy