优化装置掉线重连机制

This commit is contained in:
xy
2024-08-20 16:30:50 +08:00
parent 21ee13e71f
commit a0e5394f11
5 changed files with 99 additions and 101 deletions

View File

@@ -352,8 +352,6 @@ public class MqttMessageHandler {
}
//询问设备软件信息
askDevData(nDid,version,1,mid);
//更新监测点信息
askDevData(nDid,version,3,(res.getMid()+1));
} else {
log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
logDto.setResult(0);
@@ -367,7 +365,6 @@ public class MqttMessageHandler {
RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class);
switch (rspDataDto.getDataType()){
case 1:
log.info("{} 更新设备软件信息",nDid);
logDto.setOperate(nDid + "更新设备软件信息");
RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class);
//记录设备软件信息
@@ -395,7 +392,6 @@ public class MqttMessageHandler {
List<RspDataDto.LdevInfo> devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class);
if (CollectionUtil.isNotEmpty(devInfo)){
if (Objects.equals(res.getDid(),1)){
log.info("{} 更新治理监测点信息和设备容量",nDid);
logDto.setOperate(nDid + "更新治理监测点信息和设备容量");
List<CsDevCapacityPO> list = new ArrayList<>();
devInfo.forEach(item->{
@@ -422,7 +418,6 @@ public class MqttMessageHandler {
//4.询问监测点pt/ct信息
askDevData(nDid,version,3,(res.getMid()+1));
} else if (Objects.equals(res.getDid(),2)) {
log.info("{} 更新电网侧、负载侧监测点信息",nDid);
logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息");
//1.更新电网侧、负载侧监测点相关信息
devInfo.forEach(item->{

View File

@@ -1,16 +1,12 @@
package com.njcn.access.listener;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.service.ICsDeviceOnlineLogsService;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLogsFeignClient;
import com.njcn.web.utils.RequestUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
@@ -19,10 +15,10 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -53,6 +49,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
super(listenerContainer);
}
// 最大尝试次数
private static final int MAX_ATTEMPTS = 3;
// 当前尝试次数
private static int attemptCount = 0;
/**
* 针对redis数据失效事件进行数据处理
* 注意message.toString()可以获取失效的key
@@ -62,70 +64,74 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
if (StringUtils.isBlank(message.toString())) {
return;
}
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
try{
logDto.setUserName(RequestUtil.getUsername());
logDto.setLoginName(RequestUtil.getLoginName());
} catch (Exception e) {
logDto.setUserName("redis失效存储");
logDto.setLoginName(null);
}
logDto.setResult(1);
//判断失效的key是否为MQTT消费端存入的
String expiredKey = message.toString();
if(expiredKey.startsWith("MQTT:")){
String nDid = expiredKey.split(":")[1];
//装置下线
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
logDto.setOperate("装置掉线,装置为:" + nDid);
csLogsFeignClient.addUserLog(logDto);
//记录装置下线日志
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
record.setOfflineTime(LocalDateTime.now());
onlineLogsService.updateById(record);
try {
String version = csTopicService.getVersion(nDid);
//装置没有心跳,则立马发起接入请求
csDeviceService.devAccessAskTemplate(nDid,version,1);
logDto.setOperate("装置掉线3分钟发送接入请求");
csLogsFeignClient.addUserLog(logDto);
Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
throw new BusinessException(CommonResponseEnum.SUCCESS);
}
//心跳断连立马发起接入失败后1分钟再次发起请求请求3次
for (int i = 2; i < 5; i++) {
//接入再次失败,则定时发起接入请求
Thread.sleep(1000 * 60);
csDeviceService.devAccessAskTemplate(nDid,version,i);
status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
break;
}
logDto.setOperate("装置定时1分钟发送接入请求" + i + "次尝试");
csLogsFeignClient.addUserLog(logDto);
}
if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){
final int[] mid = {5};
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
ScheduledFuture<?> runnableFuture = executor.scheduleAtFixedRate(() -> {
csDeviceService.devAccessAskTemplate(nDid,version, mid[0]);
Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){
throw new BusinessException(CommonResponseEnum.SUCCESS);
} else {
mid[0] = mid[0] + 1;
}
//记录日志
logDto.setOperate("装置掉线,定时10分钟发送接入请求装置为:" + nDid + ",请求的时间戳为:" + System.currentTimeMillis());
csLogsFeignClient.addUserLog(logDto);
}, 1, 600, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
String version = csTopicService.getVersion(nDid);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
executeMainTask(scheduler,nDid,version);
}
}
//主任务
private void executeMainTask(ScheduledExecutorService scheduler, String nDid, String version) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("装置掉线触发");
logDto.setOperate(nDid + "重连");
//装置下线
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
//装置没有心跳,则立马发起接入请求
csDeviceService.devAccessAskTemplate(nDid,version,1);
Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
scheduler.shutdown();
logDto.setResult(1);
} else {
logDto.setResult(0);
logDto.setFailReason("心跳结束重连失败,启动第一个定时任务");
startScheduledTask(scheduler,nDid,version);
}
csLogsFeignClient.addUserLog(logDto);
}
//启动第一次定时任务
private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) {
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("装置掉线第一次定时任务");
if (attemptCount < MAX_ATTEMPTS) {
attemptCount++;
logDto.setOperate(nDid + "执行第一次定时任务,第 " + attemptCount + " 次尝试...");
csDeviceService.devAccessAskTemplate(nDid,version,attemptCount);
int status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
logDto.setResult(1);
scheduler.shutdown();
} else {
logDto.setResult(0);
startSecondScheduledTask(scheduler,nDid,version);
}
}
csLogsFeignClient.addUserLog(logDto);
}, 0, 5, TimeUnit.SECONDS);
}
//启动第二个定时任务
private void startSecondScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) {
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("装置掉线第二次定时任务");
logDto.setOperate(nDid + "执行第二次定时任务");
csDeviceService.devAccessAskTemplate(nDid,version,attemptCount++);
int status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())) {
logDto.setResult(1);
scheduler.shutdown();
} else {
logDto.setResult(0);
}
csLogsFeignClient.addUserLog(logDto);
}, 0, 10, TimeUnit.SECONDS);
}
}

View File

@@ -32,5 +32,7 @@ public interface ICsDevModelRelationService extends IService<CsDevModelRelationP
*/
void deleteByDeviceId(String deviceId);
void addRelation(CsDevModelRelationPO po);
}

View File

@@ -1,7 +1,9 @@
package com.njcn.access.service.impl;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsDevModelRelationMapper;
import com.njcn.access.service.ICsDevModelRelationService;
@@ -13,10 +15,13 @@ import com.njcn.csdevice.pojo.po.CsDevModelRelationPO;
import com.njcn.csdevice.pojo.vo.CsDevModelRelationVO;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.cloud.commons.util.IdUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
/**
@@ -39,7 +44,7 @@ public class CsDevModelRelationServiceImpl extends ServiceImpl<CsDevModelRelatio
queryParm.setModelId (addParm.getModelId ());
queryParm.setStatus ("1");
List<CsDevModelRelationVO> csDevModelRelationVOS = this.queryDevModelRelation (queryParm);
if(csDevModelRelationVOS.size ()>0){
if(!csDevModelRelationVOS.isEmpty()){
throw new BusinessException (AlgorithmResponseEnum.DATA_ERROR);
}
CsDevModelRelationPO csDevModelRelationPO = new CsDevModelRelationPO();
@@ -57,6 +62,17 @@ public class CsDevModelRelationServiceImpl extends ServiceImpl<CsDevModelRelatio
this.baseMapper.delete(wrapper);
}
@Override
public void addRelation(CsDevModelRelationPO po) {
LambdaUpdateWrapper<CsDevModelRelationPO> wrapper = new LambdaUpdateWrapper<>();
wrapper.eq(CsDevModelRelationPO::getDevId, po.getDevId())
.eq(CsDevModelRelationPO::getDid, po.getDid())
.eq(CsDevModelRelationPO::getStatus, 1)
.set(CsDevModelRelationPO::getModelId,po.getModelId())
.set(CsDevModelRelationPO::getUpdateTime,po.getUpdateTime());
this.update(wrapper);
}
public List<CsDevModelRelationVO> queryDevModelRelation(CsDevModelRelationQueryParm queryParm) {
QueryWrapper<CsDevModelRelationPO> queryWrapper = new QueryWrapper<> ();
queryWrapper.eq (StringUtils.isNotBlank (queryParm.getId ()),"id",queryParm.getId ()).

View File

@@ -2,6 +2,7 @@ package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -35,6 +36,7 @@ import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.SysDicTreePO;
import com.njcn.web.utils.RequestUtil;
import lombok.AllArgsConstructor;
import net.sf.cglib.core.Local;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -531,15 +533,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
List<CsModelDto> modelId = objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid));
if (CollUtil.isNotEmpty(modelId)) {
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
//删除之前的模板信息
csDevModelRelationService.deleteByDeviceId(vo.getId());
//重新录入装置和模板关系信息
for (CsModelDto item : modelId) {
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
csDevModelRelationAddParm.setDevId(vo.getId());
csDevModelRelationAddParm.setModelId(item.getModelId());
csDevModelRelationAddParm.setDid(item.getDid());
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
CsDevModelRelationPO po = new CsDevModelRelationPO();
po.setDevId(vo.getId());
po.setModelId(item.getModelId());
po.setDid(item.getDid());
po.setUpdateTime(LocalDateTime.now());
csDevModelRelationService.addRelation(po);
}
//发起接入
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
@@ -552,28 +553,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
return result;
}
// public void devAccess(String nDid,String version) {
// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
// reqAndResParam.setMid(1);
// reqAndResParam.setDid(0);
// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
// reqAndResParam.setExpire(-1);
// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam));
// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
// }
//
// public void devAccessMid(String nDid,String version,Integer mid) {
// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
// reqAndResParam.setMid(mid);
// reqAndResParam.setDid(0);
// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
// reqAndResParam.setExpire(-1);
// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam));
// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
// }
/**
* 平台对设备发起主题询问命令
*/