feat(access): 实现物联网设备心跳检测与离线处理功能

- 添加心跳超时消息模板和消费者处理机制
- 实现设备心跳接收和状态更新服务
- 新增心跳检测控制层和业务处理服务
- 集成熔断降级工厂处理心跳异常情况
- 移除原有的定时心跳检查任务
- 更新MQTT消息处理器集成新的心跳机制
- 配置日志级别过滤特定模板异常信息
- 添加消息队列依赖支持心跳超时处理
This commit is contained in:
xy
2026-05-14 09:23:32 +08:00
parent 2cad107c29
commit 15f84c1bc0
13 changed files with 543 additions and 301 deletions

View File

@@ -20,6 +20,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-mq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>

View File

@@ -0,0 +1,22 @@
package com.njcn.access.api;
import com.njcn.access.api.fallback.CsHeartbeatClientFallbackFactory;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* @author xy
*/
@FeignClient(value = ServerInfo.ACCESS_BOOT, path = "/heartbeat", fallbackFactory = CsHeartbeatClientFallbackFactory.class,contextId = "heartbeat")
public interface CsHeartbeatFeignClient {
@PostMapping("/handleHeartbeat")
@ApiOperation("处理物联设备心跳")
HttpResult<String> handleHeartbeat(@RequestBody HeartbeatTimeoutMessage message);
}

View File

@@ -0,0 +1,35 @@
package com.njcn.access.api.fallback;
import com.njcn.access.api.CsHeartbeatFeignClient;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author xy
*/
@Slf4j
@Component
public class CsHeartbeatClientFallbackFactory implements FallbackFactory<CsHeartbeatFeignClient> {
@Override
public CsHeartbeatFeignClient create(Throwable cause) {
//判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if (cause.getCause() instanceof BusinessException) {
BusinessException businessException = (BusinessException) cause.getCause();
}
Enum<?> finalExceptionEnum = exceptionEnum;
return new CsHeartbeatFeignClient() {
@Override
public HttpResult<String> handleHeartbeat(HeartbeatTimeoutMessage message) {
log.error("{}异常,降级处理,异常为:{}","处理物联设备心跳数据异常",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -0,0 +1,48 @@
package com.njcn.access.controller;
import com.njcn.access.service.ICsHeartService;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/6 11:07
*/
@Slf4j
@RestController
@RequestMapping("/heartbeat")
@Api(tags = "心跳")
@AllArgsConstructor
@ApiIgnore
public class CsHeartController extends BaseController {
private final ICsHeartService csHeartService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/handleHeartbeat")
@ApiOperation("处理物联设备心跳")
@ApiImplicitParam(name = "message", value = "message", required = true)
public HttpResult<String> handleHeartbeat(@RequestBody HeartbeatTimeoutMessage message){
String methodDescribe = getMethodDescribe("handleHeartbeat");
csHeartService.handleHeartbeat(message);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -22,10 +22,7 @@ import com.njcn.access.pojo.dto.file.FileRedisDto;
import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.pojo.po.CsLineModel;
import com.njcn.access.pojo.po.CsTopic;
import com.njcn.access.service.ICsDeviceOnlineLogsService;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsLineModelService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.*;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.exception.BusinessException;
@@ -103,6 +100,7 @@ public class MqttMessageHandler {
private final WaveFeignClient waveFeignClient;
private final RtFeignClient rtFeignClient;
private final CsCommunicateFeignClient csCommunicateFeignClient;
private final IHeartbeatService heartbeatService;
@Autowired
Validator validator;
@@ -328,7 +326,7 @@ public class MqttMessageHandler {
//更新电网侧、负载侧监测点信息
askDevData(nDid,version,3,(res.getMid()+1));
//接入后系统重置装置心跳
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L);
heartbeatService.receiveHeartbeat(nDid);
//修改redis的mid
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
//接入成功标识
@@ -349,7 +347,6 @@ public class MqttMessageHandler {
if (!Objects.isNull(rspDataDto.getDataType())) {
switch (rspDataDto.getDataType()){
case 1:
log.info("{},设备数据应答--->更新设备软件信息", nDid);
logDto.setOperate(nDid + "更新设备软件信息");
RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class);
//记录设备软件信息
@@ -380,7 +377,6 @@ public class MqttMessageHandler {
List<RspDataDto.LdevInfo> devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class);
if (CollectionUtil.isNotEmpty(devInfo)){
if (Objects.equals(res.getDid(),1)){
log.info("{},设备数据应答--->更新治理监测点信息和设备容量", nDid);
List<CsDevCapacityPO> list3 = new ArrayList<>();
boolean hasZeroClDid = devInfo.stream().anyMatch(item -> item.getClDid() == 0);
//治理设备
@@ -409,7 +405,6 @@ public class MqttMessageHandler {
equipmentFeignClient.updateModuleNumber(nDid,(devInfo.size()-1));
}
} else if (Objects.equals(res.getDid(),2)) {
log.info("{},设备数据应答--->更新电网侧、负载侧监测点信息", nDid);
logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息");
//1.更新电网侧、负载侧监测点相关信息
devInfo.forEach(item->{
@@ -419,14 +414,12 @@ public class MqttMessageHandler {
}
break;
case 15:
log.info("{}模块{}:处理实时数据", nDid, rspDataDto.getClDid());
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(res));
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class);
appAutoDataMessage.setId(nDid);
rtFeignClient.apfRtAnalysis(appAutoDataMessage);
break;
case 48:
log.info("询问装置项目列表");
logDto.setUserName("运维管理员");
logDto.setOperate("监测点:" + (nDid + rspDataDto.getClDid()) + "询问项目列表");
List<RspDataDto.ProjectInfo> projectInfoList = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.ProjectInfo.class);
@@ -439,7 +432,6 @@ public class MqttMessageHandler {
}
break;
case 4663:
log.info("装置操作应答");
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
String key4 = AppRedisKey.CONTROL + nDid;
redisUtil.saveByKeyWithExpire(key4,"success",10L);
@@ -492,8 +484,7 @@ public class MqttMessageHandler {
//响应请求
switch (res.getType()){
case 4865:
//设置心跳时间,超时改为掉线
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L);
heartbeatService.receiveHeartbeat(nDid);
//有心跳,则将装置改成在线
//csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode());
//处理心跳
@@ -531,15 +522,12 @@ public class MqttMessageHandler {
response.setPri(AccessEnum.FIRST_CHANNEL.getCode());
response.setType(Integer.parseInt(TypeEnum.TYPE_15.getCode()));
response.setCode(200);
log.info("应答事件:{}", new Gson().toJson(response));
publisher.send("/Dev/DataRsp/"+version+"/"+nDid,new Gson().toJson(response),1,false);
}
//判断事件类型
switch (dataDto.getMsg().getDataAttr()) {
//暂态事件、录波处理、工程信息
case 0:
log.info("{}处理事件", nDid);
//log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8));
EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class);
JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto));
AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class);
@@ -548,7 +536,6 @@ public class MqttMessageHandler {
break;
//实时数据
case 1:
log.info("{}处理实时数据", nDid);
JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto));
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class);
appAutoDataMessage.setId(nDid);
@@ -559,9 +546,6 @@ public class MqttMessageHandler {
JSONObject jsonObject3 = JSONObject.parseObject(JSON.toJSONString(dataDto));
AppAutoDataMessage appAutoDataMessage2 = JSONObject.toJavaObject(jsonObject3, AppAutoDataMessage.class);
appAutoDataMessage2.setId(nDid);
appAutoDataMessage2.getMsg().getDataArray().forEach(item->{
log.info("{}处理统计数据{}", nDid, item.getDataAttr());
});
appAutoDataMessageTemplate.sendMember(appAutoDataMessage2);
break;
default:
@@ -593,7 +577,6 @@ public class MqttMessageHandler {
//响应请求
switch (fileDto.getType()){
case 4657:
log.info("获取文件信息{}", fileDto);
if (Objects.equals(fileDto.getCode(),AccessEnum.SUCCESS.getCode())) {
String key = AppRedisKey.PROJECT_INFO + nDid;
if (Objects.isNull(fileDto.getMsg().getType())) {
@@ -626,7 +609,6 @@ public class MqttMessageHandler {
}
break;
case 4658:
log.info("获取文件流信息");
FileRedisDto dto = new FileRedisDto();
dto.setCode(fileDto.getCode());
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileDto.getMsg().getName() + fileDto.getMid(),dto,60L);

View File

@@ -1,25 +1,14 @@
package com.njcn.access.listener;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.utils.RedisSetUtil;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.param.DeviceMessageParam;
import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.pojo.dto.BaseRealDataSet;
import com.njcn.user.pojo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
@@ -28,12 +17,7 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author xy
@@ -80,10 +64,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
}
//判断失效的key是否为MQTT消费端存入的
String expiredKey = message.toString();
if(expiredKey.startsWith("MQTT:")){
String nDid = expiredKey.split(":")[1];
executeMainTask(nDid);
}
// if(expiredKey.startsWith("MQTT:")){
// String nDid = expiredKey.split(":")[1];
// executeMainTask(nDid);
// }
if(expiredKey.startsWith("cldRtDataOverTime:")){
String lineId = expiredKey.split(":")[1];
Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId);
@@ -100,75 +84,75 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
}
}
//主任务
//1.装置心跳断连
//2.MQTT客户端不在线
private void executeMainTask(String nDid) {
log.info("{}->装置离线", nDid);
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
//装置下线
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
//装置调整为注册状态
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null);
logDto.setOperate(nDid +"装置离线");
sendMessage(nDid);
//记录装置掉线时间
PqsCommunicateDto dto = new PqsCommunicateDto();
dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
dto.setDevId(nDid);
dto.setType(0);
dto.setDescription("通讯中断");
csCommunicateFeignClient.insertion(dto);
csLogsFeignClient.addUserLog(logDto);
//清空缓存
redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid);
}
// //主任务
// //1.装置心跳断连
// //2.MQTT客户端不在线
// private void executeMainTask(String nDid) {
// log.info("{}->装置离线", nDid);
// DeviceLogDTO logDto = new DeviceLogDTO();
// logDto.setUserName("运维管理员");
// logDto.setLoginName("njcnyw");
// //装置下线
// csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
// //装置调整为注册状态
// csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null);
// logDto.setOperate(nDid +"装置离线");
// sendMessage(nDid);
// //记录装置掉线时间
// PqsCommunicateDto dto = new PqsCommunicateDto();
// dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
// dto.setDevId(nDid);
// dto.setType(0);
// dto.setDescription("通讯中断");
// csCommunicateFeignClient.insertion(dto);
// csLogsFeignClient.addUserLog(logDto);
// //清空缓存
// redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid);
// }
//判断设备型号发送数据
private void sendMessage(String nDid) {
boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
if (devModel) {
NoticeUserDto dto = sendOffLine(nDid);
if (CollectionUtil.isNotEmpty(dto.getPushClientId())) {
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
}
}
}
// //判断设备型号发送数据
// private void sendMessage(String nDid) {
// boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
// if (devModel) {
// NoticeUserDto dto = sendOffLine(nDid);
// if (CollectionUtil.isNotEmpty(dto.getPushClientId())) {
// sendMessageUtil.sendEventToUser(dto);
// addLogs(dto);
// }
// }
// }
//掉线通知
private NoticeUserDto sendOffLine(String nDid) {
NoticeUserDto dto = new NoticeUserDto();
dto.setTitle("设备离线");
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData();
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData();
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.now();
String dateStr = localDateTime.format(fmt);
String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + dateStr + "离线");
dto.setContent(content);
//获取设备关联的用户
List<String> eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData();
DeviceMessageParam param1 = new DeviceMessageParam();
param1.setUserList(eventUser);
param1.setEventType(2);
//获取打开推送的用户
List<User> users = deviceMessageFeignClient.getSendUserByType(param1).getData();
if (CollectionUtil.isNotEmpty(users)){
dto.setPushClientId(
users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList()));
}
return dto;
}
//日志记录
private void addLogs(NoticeUserDto noticeUserDto) {
DeviceLogDTO dto = new DeviceLogDTO();
dto.setUserName("运维管理员");
dto.setLoginName("njcnyw");
dto.setOperate(noticeUserDto.getContent());
csLogsFeignClient.addUserLog(dto);
}
// //掉线通知
// private NoticeUserDto sendOffLine(String nDid) {
// NoticeUserDto dto = new NoticeUserDto();
// dto.setTitle("设备离线");
// CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData();
// DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData();
// DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// LocalDateTime localDateTime = LocalDateTime.now();
// String dateStr = localDateTime.format(fmt);
// String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + dateStr + "离线");
// dto.setContent(content);
// //获取设备关联的用户
// List<String> eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData();
// DeviceMessageParam param1 = new DeviceMessageParam();
// param1.setUserList(eventUser);
// param1.setEventType(2);
// //获取打开推送的用户
// List<User> users = deviceMessageFeignClient.getSendUserByType(param1).getData();
// if (CollectionUtil.isNotEmpty(users)){
// dto.setPushClientId(
// users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList()));
// }
// return dto;
// }
//
// //日志记录
// private void addLogs(NoticeUserDto noticeUserDto) {
// DeviceLogDTO dto = new DeviceLogDTO();
// dto.setUserName("运维管理员");
// dto.setLoginName("njcnyw");
// dto.setOperate(noticeUserDto.getContent());
// csLogsFeignClient.addUserLog(dto);
// }
}

View File

@@ -1,190 +0,0 @@
package com.njcn.access.runner;
import cn.hutool.core.collection.CollUtil;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.utils.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author xy
* 定时1小时轮询设备列表并使用多线程进行业务处理
* 1. 定时查询所有设备列表
* 2. 使用多线程处理每个设备的业务逻辑
* 3. 处理完成后释放资源
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class MqttHeartCheckTimer implements ApplicationRunner {
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
private final RedisUtil redisUtil;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 1小时间隔
private static final long INTERVAL_MINUTES = 60L;
@Override
public void run(ApplicationArguments args) {
if (scheduler.isShutdown() || scheduler.isTerminated()) {
scheduler = Executors.newScheduledThreadPool(1);
}
Runnable task = () -> {
try {
executeScheduledTask();
} catch (Throwable t) {
log.error("定时设备处理任务发生严重异常", t);
}
};
// 每小时执行一次
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
task,
0,
INTERVAL_MINUTES,
TimeUnit.MINUTES
);
// 添加监控,如果任务被取消则重新调度
monitorScheduledTask(future);
}
/**
* 监控定时任务
*/
private void monitorScheduledTask(ScheduledFuture<?> future) {
Thread monitorThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// 每10分钟检查一次任务状态
Thread.sleep(600000);
if (future.isCancelled() || future.isDone()) {
log.warn("定时设备处理任务被取消或完成,重新调度...");
// 重新启动任务
run(null);
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("监控线程被中断");
break;
} catch (Exception e) {
log.error("监控任务异常", e);
}
}
}, "Device-Processor-Monitor-Thread");
monitorThread.setDaemon(true);
monitorThread.start();
}
/**
* 执行定时任务的主要逻辑
*/
private void executeScheduledTask() {
log.info("开始执行定时设备处理任务(查看在线设备和心跳数据是否一致) - 时间: {}", System.currentTimeMillis());
try {
// 查询所有设备列表
List<CsEquipmentDeliveryPO> deviceList = csEquipmentDeliveryService.getUseOnlineDevice();
if (CollUtil.isEmpty(deviceList)) {
log.info("设备列表为空,跳过处理");
return;
}
log.info("查询到 {} 个设备,开始多线程处理", deviceList.size());
// 创建线程池进行多线程处理,根据设备数量动态调整线程数
// 最大10个线程
int threadCount = Math.min(deviceList.size(), 10);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
try {
// 将设备列表分批处理,每批处理一定数量的设备
List<List<CsEquipmentDeliveryPO>> batches = CollUtil.split(deviceList, threadCount);
List<Future<Void>> futures = new ArrayList<>();
for (List<CsEquipmentDeliveryPO> batch : batches) {
futures.add(executor.submit(() -> {
try {
processDeviceBatch(batch);
} catch (Exception e) {
log.error("处理设备批次异常", e);
}
return null;
}));
}
// 等待所有任务完成,设置超时时间防止长时间阻塞
for (Future<Void> future : futures) {
try {
// 设置10分钟超时
future.get(10, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.error("设备批次处理超时", e);
} catch (Exception e) {
log.error("设备批次处理异常", e);
}
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
log.warn("设备处理线程池未在规定时间内关闭,强制关闭");
executor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("等待线程池关闭时被中断");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
log.info("设备批量处理任务完成 - 时间: {}", System.currentTimeMillis());
} catch (Exception e) {
log.error("执行设备处理任务异常", e);
}
}
/**
* 批量处理设备
*/
private void processDeviceBatch(List<CsEquipmentDeliveryPO> deviceBatch) {
if (CollUtil.isEmpty(deviceBatch)) {
return;
}
for (CsEquipmentDeliveryPO device : deviceBatch) {
try {
processSingleDevice(device);
} catch (Exception e) {
log.error("处理单个设备失败: 设备ID={}, 错误={}", device.getNdid(), e.getMessage(), e);
}
}
}
/**
* 处理单个设备的业务逻辑
* 注意:这里需要根据实际业务需求实现具体的业务逻辑
*/
private void processSingleDevice(CsEquipmentDeliveryPO device) {
log.info("正在处理设备: {}", device.getNdid());
// 1. 检查设备在Redis中的状态
String deviceKey = "MQTT:" + device.getNdid();
Object deviceStatus = redisUtil.getObjectByKey(deviceKey);
// 2. 如果没有心跳,则模拟补充个心跳
if (deviceStatus == null) {
// 如果Redis中没有该设备的状态信息可以设置默认值或执行相应处理
redisUtil.saveByKeyWithExpire(deviceKey, device.getNdid(), 100L);
}
log.info("设备处理完成: {}", device.getNdid());
}
}

View File

@@ -0,0 +1,12 @@
package com.njcn.access.service;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
/**
* @author xy
*/
public interface ICsHeartService {
void handleHeartbeat(HeartbeatTimeoutMessage message);
}

View File

@@ -0,0 +1,8 @@
package com.njcn.access.service;
public interface IHeartbeatService {
void receiveHeartbeat(String nDid);
Boolean isHeartbeatUpdated(String nDid, Long sendTime);
}

View File

@@ -0,0 +1,146 @@
package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsHeartService;
import com.njcn.access.service.IHeartbeatService;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.param.DeviceMessageParam;
import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* <p>
* 数据集表 服务实现类
* </p>
*
* @author xuyang
* @since 2023-08-01
*/
@Service
@Slf4j
public class CsHeartServiceImpl implements ICsHeartService {
@Resource
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Resource
private CsLogsFeignClient csLogsFeignClient;
@Resource
private EquipmentFeignClient equipmentFeignClient;
@Resource
private SendMessageUtil sendMessageUtil;
@Resource
private CsLedgerFeignClient csLedgerFeignclient;
@Resource
private AppUserFeignClient appUserFeignClient;
@Resource
private CsDeviceUserFeignClient csDeviceUserFeignClient;
@Resource
private UserFeignClient userFeignClient;
@Resource
private IHeartbeatService heartbeatService;
@Resource
private CsCommunicateFeignClient csCommunicateFeignClient;
@Resource
private RedisUtil redisUtil;
@Resource
private DeviceMessageFeignClient deviceMessageFeignClient;
@Override
public void handleHeartbeat(HeartbeatTimeoutMessage message) {
String nDid = message.getNDid();
Long sendTime = message.getTimestamp();
if (heartbeatService.isHeartbeatUpdated(nDid, sendTime)) {
return;
}
log.info("{}->装置离线,执行业务处理", nDid);
handleDeviceOffline(nDid);
}
private void handleDeviceOffline(String nDid) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
//装置下线
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
//装置调整为注册状态
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null);
logDto.setOperate(nDid +"装置离线");
sendMessage(nDid);
//记录装置掉线时间
PqsCommunicateDto dto = new PqsCommunicateDto();
dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
dto.setDevId(nDid);
dto.setType(0);
dto.setDescription("通讯中断");
csCommunicateFeignClient.insertion(dto);
csLogsFeignClient.addUserLog(logDto);
//清空缓存
redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid);
}
private void sendMessage(String nDid) {
boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
if (devModel) {
NoticeUserDto dto = sendOffLine(nDid);
if (CollectionUtil.isNotEmpty(dto.getPushClientId())) {
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
}
}
}
//掉线通知
private NoticeUserDto sendOffLine(String nDid) {
NoticeUserDto dto = new NoticeUserDto();
dto.setTitle("设备离线");
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData();
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData();
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.now();
String dateStr = localDateTime.format(fmt);
String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + dateStr + "离线");
dto.setContent(content);
//获取设备关联的用户
List<String> eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData();
DeviceMessageParam param1 = new DeviceMessageParam();
param1.setUserList(eventUser);
param1.setEventType(2);
//获取打开推送的用户
List<User> users = deviceMessageFeignClient.getSendUserByType(param1).getData();
if (CollectionUtil.isNotEmpty(users)){
dto.setPushClientId(
users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(org.apache.commons.lang3.StringUtils::isNotBlank).distinct().collect(Collectors.toList()));
}
return dto;
}
private void addLogs(NoticeUserDto noticeUserDto) {
DeviceLogDTO dto = new DeviceLogDTO();
dto.setUserName("运维管理员");
dto.setLoginName("njcnyw");
dto.setOperate(noticeUserDto.getContent());
csLogsFeignClient.addUserLog(dto);
}
}

View File

@@ -0,0 +1,55 @@
package com.njcn.access.service.impl;
import com.njcn.access.service.IHeartbeatService;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
import com.njcn.mq.template.HeartbeatTimeoutMessageTemplate;
import com.njcn.redis.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author xy
*/
@Service
@Slf4j
public class HeartbeatServiceImpl implements IHeartbeatService {
@Resource
private HeartbeatTimeoutMessageTemplate heartbeatTimeoutMessageTemplate;
@Resource
private RedisUtil redisUtil;
private static final String HEARTBEAT_REDIS_KEY_PREFIX = "HEARTBEAT:";
private static final int DELAY_LEVEL_4MIN = 7;
private static final long HEARTBEAT_EXPIRE_SECONDS = 180;
@Override
public void receiveHeartbeat(String nDid) {
String redisKey = HEARTBEAT_REDIS_KEY_PREFIX + nDid;
long currentTime = System.currentTimeMillis();
redisUtil.saveByKey(redisKey, currentTime);
redisUtil.expire(redisKey, HEARTBEAT_EXPIRE_SECONDS);
HeartbeatTimeoutMessage message = new HeartbeatTimeoutMessage();
message.setNDid(nDid);
message.setTimestamp(currentTime);
message.setDelayLevel(DELAY_LEVEL_4MIN);
heartbeatTimeoutMessageTemplate.sendMember(message);
}
@Override
public Boolean isHeartbeatUpdated(String nDid, Long sendTime) {
String redisKey = HEARTBEAT_REDIS_KEY_PREFIX + nDid;
Object lastHeartbeat = redisUtil.getObjectByKey(redisKey);
if (lastHeartbeat == null) {
return false;
}
long lastUpdateTime = Long.parseLong(lastHeartbeat.toString());
return lastUpdateTime > sendTime;
}
}

View File

@@ -45,6 +45,7 @@ logging:
config: http://@nacos.url@/nacos/v1/cs/configs?tenant=@nacos.namespace@&group=DEFAULT_GROUP&dataId=logback.xml
level:
root: info
com.njcn.middle.rocket.template.RocketMQEnhanceTemplate: ERROR
#mybatis配置信息

View File

@@ -0,0 +1,134 @@
package com.njcn.message.consumer;
import com.njcn.access.api.CsHeartbeatFeignClient;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:32
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.HEARTBEAT_TIMEOUT_TOPIC,
consumerGroup = BusinessTopic.HEARTBEAT_TIMEOUT_TOPIC,
selectorExpression = BusinessTopic.HeartTag.APF_TAG,
consumeThreadNumber = 1,
enableMsgTrace = true
)
@Slf4j
public class HeartbeatTimeoutConsumer extends EnhanceConsumerMessageHandler<HeartbeatTimeoutMessage> implements RocketMQListener<HeartbeatTimeoutMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Resource
private CsHeartbeatFeignClient csHeartbeatFeignClient;
@Override
protected void handleMessage(HeartbeatTimeoutMessage appFileMessage) {
csHeartbeatFeignClient.handleHeartbeat(appFileMessage);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(HeartbeatTimeoutMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(HeartbeatTimeoutMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, 300L);
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(HeartbeatTimeoutMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(HeartbeatTimeoutMessage appFileMessage) {
super.dispatchMessage(appFileMessage);
}
}