feat(timer): 添加MQTT心跳检测定时器并优化数据处理

- 新增MqttHeartCheckTimer定时器,每小时轮询在线设备并进行心跳检测
- 在CsEquipmentDeliveryService中添加getUseOnlineDevice方法获取启用且在线的MQTT设备
This commit is contained in:
xy
2026-05-08 16:30:41 +08:00
parent 0f532033b0
commit 2cad107c29
6 changed files with 266 additions and 115 deletions

View File

@@ -7,10 +7,7 @@ import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.service.ICsDeviceOnlineLogsService;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.MqttUtil;
import com.njcn.access.utils.RedisSetUtil;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
@@ -22,9 +19,6 @@ import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.pojo.dto.BaseRealDataSet;
import com.njcn.user.api.AppInfoSetFeignClient;
import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -53,24 +47,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
@Resource
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Resource
private CsDeviceServiceImpl csDeviceService;
@Resource
private CsLogsFeignClient csLogsFeignClient;
@Resource
private ICsDeviceOnlineLogsService onlineLogsService;
@Resource
private MqttUtil mqttUtil;
@Resource
private CsLedgerFeignClient csLedgerFeignclient;
@Resource
private EquipmentFeignClient equipmentFeignClient;
@Resource
private AppUserFeignClient appUserFeignClient;
@Resource
private CsDeviceUserFeignClient csDeviceUserFeignClient;
@Resource
private UserFeignClient userFeignClient;
@Resource
private RedisUtil redisUtil;
@Resource
private SendMessageUtil sendMessageUtil;
@@ -81,20 +63,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
@Resource
private RedisSetUtil redisSetUtil;
@Resource
private AppInfoSetFeignClient appInfoSetFeignClient;
@Resource
private DeviceMessageFeignClient deviceMessageFeignClient;
private final Object lock = new Object();
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
//最大告警次数
private static int MAX_WARNING_TIMES = 0;
/**
* 针对redis数据失效事件进行数据处理
* 注意message.toString()可以获取失效的key
@@ -124,13 +98,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
});
}
}
//云前置设备心跳丢失处理
// if(expiredKey.startsWith(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey())){
// String node = expiredKey.split(":")[1];
// String nodeId = node.substring(0, node.length() - 1);
// int processNo = Integer.parseInt(node.substring(node.length() - 1));
// equipmentFeignClient.updateCldDevStatus(nodeId,processNo);
// }
}
//主任务

View File

@@ -161,74 +161,4 @@ public class AutoAccessTimer implements ApplicationRunner {
}
}
// @Override
// public void run(ApplicationArguments args) {
// if (scheduler.isShutdown() || scheduler.isTerminated()) {
// scheduler = Executors.newScheduledThreadPool(1);
// }
// Runnable task = () -> {
// log.info("轮询定时任务执行中!");
// List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getOfflineDev();
// if (CollUtil.isNotEmpty(list)) {
// ExecutorService executor = Executors.newFixedThreadPool(10);
// // 将任务平均分配给10个子列表
// List<List<CsEquipmentDeliveryPO>> subLists = CollUtil.split(list, 10);
// // 创建一个ExecutorService来处理这些任务
// List<Future<Void>> futures = new ArrayList<>();
// for (List<CsEquipmentDeliveryPO> subList : subLists) {
// futures.add(executor.submit(() -> {
// try {
// accessDev(subList);
// } catch (Exception e) {
// log.error("处理设备子列表异常,但继续处理其他任务", e);
// }
// return null;
// }));
// }
// // 等待所有任务完成
// for (Future<Void> future : futures) {
// try {
// future.get();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// log.error("任务被中断", e);
// } catch (ExecutionException e) {
// log.error("任务执行异常", e.getCause());
// } catch (Exception e) {
// log.error("系统异常", e.getCause());
// }
// }
// // 关闭ExecutorService
// executor.shutdown();
// }
// };
// //第一次执行的时间为120s然后在前一个任务执行完毕后等待120s再执行下一个任务
// scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS);
// }
//
// public void accessDev(List<CsEquipmentDeliveryPO> list) {
// if (CollUtil.isNotEmpty(list)) {
// try {
// list.forEach(item -> {
// System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid());
// //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
// String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
// if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) {
// //csDeviceService.wlDevRegister(item.getNdid());
// log.info("请先手动注册、接入");
// } else {
// String version = csTopicService.getVersion(item.getNdid());
// if (Objects.isNull(version)) {
// version = "V1";
// }
// csDeviceService.autoAccess(item.getNdid(), version, 1);
// }
// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
// });
// } catch (Exception e) {
// log.error(e.getMessage());
// }
// }
// }
}

View File

@@ -0,0 +1,190 @@
package com.njcn.access.runner;
import cn.hutool.core.collection.CollUtil;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.utils.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author xy
* 定时1小时轮询设备列表并使用多线程进行业务处理
* 1. 定时查询所有设备列表
* 2. 使用多线程处理每个设备的业务逻辑
* 3. 处理完成后释放资源
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class MqttHeartCheckTimer implements ApplicationRunner {
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
private final RedisUtil redisUtil;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 1小时间隔
private static final long INTERVAL_MINUTES = 60L;
@Override
public void run(ApplicationArguments args) {
if (scheduler.isShutdown() || scheduler.isTerminated()) {
scheduler = Executors.newScheduledThreadPool(1);
}
Runnable task = () -> {
try {
executeScheduledTask();
} catch (Throwable t) {
log.error("定时设备处理任务发生严重异常", t);
}
};
// 每小时执行一次
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
task,
0,
INTERVAL_MINUTES,
TimeUnit.MINUTES
);
// 添加监控,如果任务被取消则重新调度
monitorScheduledTask(future);
}
/**
* 监控定时任务
*/
private void monitorScheduledTask(ScheduledFuture<?> future) {
Thread monitorThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// 每10分钟检查一次任务状态
Thread.sleep(600000);
if (future.isCancelled() || future.isDone()) {
log.warn("定时设备处理任务被取消或完成,重新调度...");
// 重新启动任务
run(null);
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("监控线程被中断");
break;
} catch (Exception e) {
log.error("监控任务异常", e);
}
}
}, "Device-Processor-Monitor-Thread");
monitorThread.setDaemon(true);
monitorThread.start();
}
/**
* 执行定时任务的主要逻辑
*/
private void executeScheduledTask() {
log.info("开始执行定时设备处理任务(查看在线设备和心跳数据是否一致) - 时间: {}", System.currentTimeMillis());
try {
// 查询所有设备列表
List<CsEquipmentDeliveryPO> deviceList = csEquipmentDeliveryService.getUseOnlineDevice();
if (CollUtil.isEmpty(deviceList)) {
log.info("设备列表为空,跳过处理");
return;
}
log.info("查询到 {} 个设备,开始多线程处理", deviceList.size());
// 创建线程池进行多线程处理,根据设备数量动态调整线程数
// 最大10个线程
int threadCount = Math.min(deviceList.size(), 10);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
try {
// 将设备列表分批处理,每批处理一定数量的设备
List<List<CsEquipmentDeliveryPO>> batches = CollUtil.split(deviceList, threadCount);
List<Future<Void>> futures = new ArrayList<>();
for (List<CsEquipmentDeliveryPO> batch : batches) {
futures.add(executor.submit(() -> {
try {
processDeviceBatch(batch);
} catch (Exception e) {
log.error("处理设备批次异常", e);
}
return null;
}));
}
// 等待所有任务完成,设置超时时间防止长时间阻塞
for (Future<Void> future : futures) {
try {
// 设置10分钟超时
future.get(10, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.error("设备批次处理超时", e);
} catch (Exception e) {
log.error("设备批次处理异常", e);
}
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
log.warn("设备处理线程池未在规定时间内关闭,强制关闭");
executor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("等待线程池关闭时被中断");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
log.info("设备批量处理任务完成 - 时间: {}", System.currentTimeMillis());
} catch (Exception e) {
log.error("执行设备处理任务异常", e);
}
}
/**
* 批量处理设备
*/
private void processDeviceBatch(List<CsEquipmentDeliveryPO> deviceBatch) {
if (CollUtil.isEmpty(deviceBatch)) {
return;
}
for (CsEquipmentDeliveryPO device : deviceBatch) {
try {
processSingleDevice(device);
} catch (Exception e) {
log.error("处理单个设备失败: 设备ID={}, 错误={}", device.getNdid(), e.getMessage(), e);
}
}
}
/**
* 处理单个设备的业务逻辑
* 注意:这里需要根据实际业务需求实现具体的业务逻辑
*/
private void processSingleDevice(CsEquipmentDeliveryPO device) {
log.info("正在处理设备: {}", device.getNdid());
// 1. 检查设备在Redis中的状态
String deviceKey = "MQTT:" + device.getNdid();
Object deviceStatus = redisUtil.getObjectByKey(deviceKey);
// 2. 如果没有心跳,则模拟补充个心跳
if (deviceStatus == null) {
// 如果Redis中没有该设备的状态信息可以设置默认值或执行相应处理
redisUtil.saveByKeyWithExpire(deviceKey, device.getNdid(), 100L);
}
log.info("设备处理完成: {}", device.getNdid());
}
}

View File

@@ -65,6 +65,11 @@ public interface ICsEquipmentDeliveryService extends IService<CsEquipmentDeliver
*/
List<CsEquipmentDeliveryPO> getOnlineDev();
/**
* 获取启用、系统在线、MQTT接入的装置
*/
List<CsEquipmentDeliveryPO> getUseOnlineDevice();
/**
* 获取离线、启用、客户端在线的装置
*/

View File

@@ -17,10 +17,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -126,6 +123,15 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
return result;
}
@Override
public List<CsEquipmentDeliveryPO> getUseOnlineDevice() {
return this.lambdaQuery()
.eq(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.ONLINE.getCode())
.eq(CsEquipmentDeliveryPO::getDevAccessMethod,"MQTT")
.eq(CsEquipmentDeliveryPO::getUsageStatus,1)
.list();
}
@Override
public List<CsEquipmentDeliveryPO> getOfflineDev() {
List<CsEquipmentDeliveryPO> result = new ArrayList<>();

View File

@@ -152,7 +152,7 @@ public class StatServiceImpl implements IStatService {
}
if (CollectionUtil.isNotEmpty(recordList)){
//influx数据批量入库
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, recordList);
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.SECONDS, recordList);
//记录监测点最新数据时间
CsLineLatestData csLineLatestData = new CsLineLatestData();
csLineLatestData.setLineId(lineId);
@@ -192,23 +192,40 @@ public class StatServiceImpl implements IStatService {
*/
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) {
List<String> records = new ArrayList<String>();
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
if (CollectionUtil.isEmpty(floats)){
throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL);
}
//校验模板和解码数据数量能否对应上
if (!Objects.equals(dataArrayList.size(),floats.size())){
throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH);
}
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
//fixme 捂脸设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
long originalTimeSec = flag ? item.getDataTimeSec() : item.getDataTimeSec() - 8 * 3600;
if (originalTimeSec < 0) {
System.out.println("originalTimeSec==:" + originalTimeSec);
}
for (int i = 0; i < dataArrayList.size(); i++) {
String tableName = map.get(dataArrayList.get(i).getName());
long adjustedTimeSec;
//短时闪变 || 电压波动 10分钟
if (Objects.equals(tableName,"data_flicker") || Objects.equals(tableName,"data_fluc")) {
adjustedTimeSec = (originalTimeSec / 600) * 600;
}
//长时闪变 2小时
else if (Objects.equals(tableName,"data_plt")) {
adjustedTimeSec = (originalTimeSec / 7200) * 7200;
}
else {
adjustedTimeSec = originalTimeSec;
}
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.LINE_ID,lineId);
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
//todo 不清楚之前为啥要修改相别,这边按字典配置相别无法查询到数据,先改回来
//tags.put(InfluxDBTableConstant.PHASIC_TYPE,Objects.isNull(PHASE_MAPPING.get(dataArrayList.get(i).getPhase()))?dataArrayList.get(i).getPhase():PHASE_MAPPING.get(dataArrayList.get(i).getPhase()));
tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod);
tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
tags.put(InfluxDBTableConstant.PROCESS,process.toString());
@@ -217,9 +234,8 @@ public class StatServiceImpl implements IStatService {
//这边特殊处理如果数据为3.14159则将数据置为null
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
//fixme 设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
Point point = influxDbUtils.pointBuilder(tableName, flag?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
Point point = influxDbUtils.pointBuilder(tableName, adjustedTimeSec, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
@@ -227,6 +243,43 @@ public class StatServiceImpl implements IStatService {
return records;
}
// public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) {
// List<String> records = new ArrayList<String>();
// //解码
// List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
// if (CollectionUtil.isEmpty(floats)){
// throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL);
// }
// //校验模板和解码数据数量能否对应上
// if (!Objects.equals(dataArrayList.size(),floats.size())){
// throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH);
// }
// Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
// for (int i = 0; i < dataArrayList.size(); i++) {
// String tableName = map.get(dataArrayList.get(i).getName());
// Map<String, String> tags = new HashMap<>();
// tags.put(InfluxDBTableConstant.LINE_ID,lineId);
// tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
// //todo 不清楚之前为啥要修改相别,这边按字典配置相别无法查询到数据,先改回来
// //tags.put(InfluxDBTableConstant.PHASIC_TYPE,Objects.isNull(PHASE_MAPPING.get(dataArrayList.get(i).getPhase()))?dataArrayList.get(i).getPhase():PHASE_MAPPING.get(dataArrayList.get(i).getPhase()));
// tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod);
// tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
// tags.put(InfluxDBTableConstant.PROCESS,process.toString());
// tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0");
// Map<String,Object> fields = new HashMap<>();
// //这边特殊处理如果数据为3.14159则将数据置为null
// fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
// fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
// //fixme 设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
// boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
// Point point = influxDbUtils.pointBuilder(tableName, flag?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
// BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
// batchPoints.point(point);
// records.add(batchPoints.lineProtocol());
// }
// return records;
// }
public List<CsDataArray> objectToList(Object object) {
List<CsDataArray> urlList = new ArrayList<>();
if (object != null) {