refactor(iot-access): 重构设备离线通知和事件推送逻辑

- 移除过时的定时任务相关代码和依赖
- 集成新的DeviceMessageFeignClient服务接口
- 更新设备离线通知逻辑,增加推送客户端检查
- 修改事件用户获取方式,使用远程服务调用
- 优化推送用户筛选流程,统一使用DeviceMessageParam参数
- 清理冗余的导入包和废弃的方法
- 调整事件推送类型的参数传递方式
This commit is contained in:
xy
2026-04-17 16:20:06 +08:00
parent 48c79b721e
commit 8041c5f27e
2 changed files with 37 additions and 172 deletions

View File

@@ -1,12 +1,12 @@
package com.njcn.access.listener;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.service.ICsDeviceOnlineLogsService;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
@@ -15,12 +15,13 @@ import com.njcn.access.utils.RedisSetUtil;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.param.DeviceMessageParam;
import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.pojo.dto.BaseRealDataSet;
import com.njcn.user.api.AppInfoSetFeignClient;
import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
@@ -37,9 +38,6 @@ import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -81,6 +79,11 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private MqttPublisher publisher;
@Resource
private RedisSetUtil redisSetUtil;
@Resource
private AppInfoSetFeignClient appInfoSetFeignClient;
@Resource
private DeviceMessageFeignClient deviceMessageFeignClient;
private final Object lock = new Object();
@@ -153,75 +156,15 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
csLogsFeignClient.addUserLog(logDto);
}
private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) {
synchronized (lock) {
//判断是否推送消息
boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
if (devModel) {
NoticeUserDto dto = sendOffLine(nDid);
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
}
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
log.info(nDid + "执行重连定时任务...");
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setOperate(nDid + "重连定时任务");
//判断客户端
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (mqttClient) {
csDeviceService.devAccessAskTemplate(nDid,version,1);
try {
Thread.sleep(5000);
Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
logDto.setResult(1);
scheduler.shutdown();
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
return;
} else {
logDto.setResult(0);
//一个小时未连接上,则推送告警消息
MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 30 && devModel) {
NoticeUserDto dto2 = sendConnectFail(nDid);
sendMessageUtil.sendEventToUser(dto2);
addLogs(dto2);
}
//记录装置掉线时间
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
record.setOfflineTime(LocalDateTime.now());
onlineLogsService.updateById(record);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
//一个小时未连接上,则推送告警消息
MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 30 && devModel) {
NoticeUserDto dto2 = sendConnectFail(nDid);
sendMessageUtil.sendEventToUser(dto2);
addLogs(dto2);
}
logDto.setResult(0);
//记录装置掉线时间
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
record.setOfflineTime(LocalDateTime.now());
onlineLogsService.updateById(record);
}
csLogsFeignClient.addUserLog(logDto);
}, 0, 2, TimeUnit.MINUTES);
}
}
//判断设备型号发送数据
private void sendMessage(String nDid) {
boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
if (devModel) {
NoticeUserDto dto = sendOffLine(nDid);
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
if (CollectionUtil.isNotEmpty(dto.getPushClientId())) {
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
}
}
}
@@ -236,22 +179,17 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
String dateStr = localDateTime.format(fmt);
String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + dateStr + "离线");
dto.setContent(content);
dto.setPushClientId(getEventUser(po.getId(),true));
return dto;
}
//重连失败通知
private NoticeUserDto sendConnectFail(String nDid) {
NoticeUserDto dto = new NoticeUserDto();
dto.setTitle("设备接入失败");
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData();
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData();
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.now();
String dateStr = localDateTime.format(fmt);
String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + dateStr + "多次接入失败");
dto.setContent(content);
dto.setPushClientId(getEventUser(po.getId(),false));
//获取设备关联的用户
List<String> eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData();
DeviceMessageParam param1 = new DeviceMessageParam();
param1.setUserList(eventUser);
param1.setEventType(2);
//获取打开推送的用户
List<User> users = deviceMessageFeignClient.getSendUserByType(param1).getData();
if (CollectionUtil.isNotEmpty(users)){
dto.setPushClientId(
users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList()));
}
return dto;
}
@@ -263,18 +201,4 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
dto.setOperate(noticeUserDto.getContent());
csLogsFeignClient.addUserLog(dto);
}
/**
* 获取所有需要推送的用户id
*/
public List<String> getEventUser(String devId, boolean isAdmin) {
List<User> adminUser = appUserFeignClient.getAdminInfo().getData();
List<String> adminList = adminUser.stream().map(User::getId).collect(Collectors.toList());
if (isAdmin) {
List<String> list = csDeviceUserFeignClient.findUserById(devId).getData();
adminList.addAll(list);
}
List<User> users = userFeignClient.appuserByIdList(adminList).getData();
return users.stream().map(User::getDevCode).filter(Objects::nonNull).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList());
}
}