From a0e5394f11f7c04c5cd467ec2a032724e9dbfa96 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Tue, 20 Aug 2024 16:30:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=A3=85=E7=BD=AE=E6=8E=89?= =?UTF-8?q?=E7=BA=BF=E9=87=8D=E8=BF=9E=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/handler/MqttMessageHandler.java | 5 - .../listener/RedisKeyExpirationListener.java | 138 +++++++++--------- .../service/ICsDevModelRelationService.java | 2 + .../impl/CsDevModelRelationServiceImpl.java | 18 ++- .../service/impl/CsDeviceServiceImpl.java | 37 +---- 5 files changed, 99 insertions(+), 101 deletions(-) diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 8c91519..29b9db4 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -352,8 +352,6 @@ public class MqttMessageHandler { } //询问设备软件信息 askDevData(nDid,version,1,mid); - //更新监测点信息 - askDevData(nDid,version,3,(res.getMid()+1)); } else { log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); logDto.setResult(0); @@ -367,7 +365,6 @@ public class MqttMessageHandler { RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class); switch (rspDataDto.getDataType()){ case 1: - log.info("{} 更新设备软件信息",nDid); logDto.setOperate(nDid + "更新设备软件信息"); RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class); //记录设备软件信息 @@ -395,7 +392,6 @@ public class MqttMessageHandler { List devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class); if (CollectionUtil.isNotEmpty(devInfo)){ if (Objects.equals(res.getDid(),1)){ - log.info("{} 更新治理监测点信息和设备容量",nDid); logDto.setOperate(nDid + "更新治理监测点信息和设备容量"); List list = new ArrayList<>(); devInfo.forEach(item->{ @@ -422,7 +418,6 @@ public class MqttMessageHandler { //4.询问监测点pt/ct信息 askDevData(nDid,version,3,(res.getMid()+1)); } else if (Objects.equals(res.getDid(),2)) { - log.info("{} 更新电网侧、负载侧监测点信息",nDid); logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息"); //1.更新电网侧、负载侧监测点相关信息 devInfo.forEach(item->{ diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 1c2fad5..420a3ba 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -1,16 +1,12 @@ package com.njcn.access.listener; import com.njcn.access.enums.AccessEnum; -import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; import com.njcn.common.pojo.dto.DeviceLogDTO; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; -import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLogsFeignClient; -import com.njcn.web.utils.RequestUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.Message; @@ -19,10 +15,10 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.time.LocalDateTime; import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -53,6 +49,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene super(listenerContainer); } + // 最大尝试次数 + private static final int MAX_ATTEMPTS = 3; + + // 当前尝试次数 + private static int attemptCount = 0; + /** * 针对redis数据失效事件,进行数据处理 * 注意message.toString()可以获取失效的key @@ -62,70 +64,74 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene if (StringUtils.isBlank(message.toString())) { return; } - //日志实体 - DeviceLogDTO logDto = new DeviceLogDTO(); - try{ - logDto.setUserName(RequestUtil.getUsername()); - logDto.setLoginName(RequestUtil.getLoginName()); - } catch (Exception e) { - logDto.setUserName("redis失效存储"); - logDto.setLoginName(null); - } - logDto.setResult(1); //判断失效的key是否为MQTT消费端存入的 String expiredKey = message.toString(); if(expiredKey.startsWith("MQTT:")){ - String nDid = expiredKey.split(":")[1]; - //装置下线 - csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); - logDto.setOperate("装置掉线,装置为:" + nDid); - csLogsFeignClient.addUserLog(logDto); - //记录装置下线日志 - CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); - record.setOfflineTime(LocalDateTime.now()); - onlineLogsService.updateById(record); - try { - String version = csTopicService.getVersion(nDid); - //装置没有心跳,则立马发起接入请求 - csDeviceService.devAccessAskTemplate(nDid,version,1); - logDto.setOperate("装置掉线3分钟发送接入请求"); - csLogsFeignClient.addUserLog(logDto); - Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); - if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ - throw new BusinessException(CommonResponseEnum.SUCCESS); - } - //心跳断连立马发起接入失败后,1分钟再次发起请求,请求3次 - for (int i = 2; i < 5; i++) { - //接入再次失败,则定时发起接入请求 - Thread.sleep(1000 * 60); - csDeviceService.devAccessAskTemplate(nDid,version,i); - status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); - if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ - break; - } - logDto.setOperate("装置定时1分钟发送接入请求,第" + i + "次尝试"); - csLogsFeignClient.addUserLog(logDto); - } - if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){ - final int[] mid = {5}; - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); - ScheduledFuture runnableFuture = executor.scheduleAtFixedRate(() -> { - csDeviceService.devAccessAskTemplate(nDid,version, mid[0]); - Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); - if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){ - throw new BusinessException(CommonResponseEnum.SUCCESS); - } else { - mid[0] = mid[0] + 1; - } - //记录日志 - logDto.setOperate("装置掉线,定时10分钟发送接入请求,装置为:" + nDid + ",请求的时间戳为:" + System.currentTimeMillis()); - csLogsFeignClient.addUserLog(logDto); - }, 1, 600, TimeUnit.SECONDS); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } + String version = csTopicService.getVersion(nDid); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + executeMainTask(scheduler,nDid,version); } } + + //主任务 + private void executeMainTask(ScheduledExecutorService scheduler, String nDid, String version) { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("装置掉线触发"); + logDto.setOperate(nDid + "重连"); + //装置下线 + csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); + //装置没有心跳,则立马发起接入请求 + csDeviceService.devAccessAskTemplate(nDid,version,1); + Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); + if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ + scheduler.shutdown(); + logDto.setResult(1); + } else { + logDto.setResult(0); + logDto.setFailReason("心跳结束重连失败,启动第一个定时任务"); + startScheduledTask(scheduler,nDid,version); + } + csLogsFeignClient.addUserLog(logDto); + } + + //启动第一次定时任务 + private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) { + ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("装置掉线第一次定时任务"); + if (attemptCount < MAX_ATTEMPTS) { + attemptCount++; + logDto.setOperate(nDid + "执行第一次定时任务,第 " + attemptCount + " 次尝试..."); + csDeviceService.devAccessAskTemplate(nDid,version,attemptCount); + int status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); + if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ + logDto.setResult(1); + scheduler.shutdown(); + } else { + logDto.setResult(0); + startSecondScheduledTask(scheduler,nDid,version); + } + } + csLogsFeignClient.addUserLog(logDto); + }, 0, 5, TimeUnit.SECONDS); + } + + //启动第二个定时任务 + private void startSecondScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) { + ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("装置掉线第二次定时任务"); + logDto.setOperate(nDid + "执行第二次定时任务"); + csDeviceService.devAccessAskTemplate(nDid,version,attemptCount++); + int status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); + if (Objects.equals(status,AccessEnum.ONLINE.getCode())) { + logDto.setResult(1); + scheduler.shutdown(); + } else { + logDto.setResult(0); + } + csLogsFeignClient.addUserLog(logDto); + }, 0, 10, TimeUnit.SECONDS); + } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java index 2e776ed..c860b88 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java @@ -32,5 +32,7 @@ public interface ICsDevModelRelationService extends IService csDevModelRelationVOS = this.queryDevModelRelation (queryParm); - if(csDevModelRelationVOS.size ()>0){ + if(!csDevModelRelationVOS.isEmpty()){ throw new BusinessException (AlgorithmResponseEnum.DATA_ERROR); } CsDevModelRelationPO csDevModelRelationPO = new CsDevModelRelationPO(); @@ -57,6 +62,17 @@ public class CsDevModelRelationServiceImpl extends ServiceImpl wrapper = new LambdaUpdateWrapper<>(); + wrapper.eq(CsDevModelRelationPO::getDevId, po.getDevId()) + .eq(CsDevModelRelationPO::getDid, po.getDid()) + .eq(CsDevModelRelationPO::getStatus, 1) + .set(CsDevModelRelationPO::getModelId,po.getModelId()) + .set(CsDevModelRelationPO::getUpdateTime,po.getUpdateTime()); + this.update(wrapper); + } + public List queryDevModelRelation(CsDevModelRelationQueryParm queryParm) { QueryWrapper queryWrapper = new QueryWrapper<> (); queryWrapper.eq (StringUtils.isNotBlank (queryParm.getId ()),"id",queryParm.getId ()). diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index eafc145..c45adf0 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -2,6 +2,7 @@ package com.njcn.access.service.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -35,6 +36,7 @@ import com.njcn.system.enums.DicDataEnum; import com.njcn.system.pojo.po.SysDicTreePO; import com.njcn.web.utils.RequestUtil; import lombok.AllArgsConstructor; +import net.sf.cglib.core.Local; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -531,15 +533,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService { List modelId = objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid)); if (CollUtil.isNotEmpty(modelId)) { CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); - //删除之前的模板信息 - csDevModelRelationService.deleteByDeviceId(vo.getId()); //重新录入装置和模板关系信息 for (CsModelDto item : modelId) { - CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm(); - csDevModelRelationAddParm.setDevId(vo.getId()); - csDevModelRelationAddParm.setModelId(item.getModelId()); - csDevModelRelationAddParm.setDid(item.getDid()); - csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm); + CsDevModelRelationPO po = new CsDevModelRelationPO(); + po.setDevId(vo.getId()); + po.setModelId(item.getModelId()); + po.setDid(item.getDid()); + po.setUpdateTime(LocalDateTime.now()); + csDevModelRelationService.addRelation(po); } //发起接入 reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); @@ -552,28 +553,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService { return result; } -// public void devAccess(String nDid,String version) { -// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); -// reqAndResParam.setMid(1); -// reqAndResParam.setDid(0); -// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); -// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); -// reqAndResParam.setExpire(-1); -// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam)); -// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); -// } -// -// public void devAccessMid(String nDid,String version,Integer mid) { -// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); -// reqAndResParam.setMid(mid); -// reqAndResParam.setDid(0); -// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); -// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); -// reqAndResParam.setExpire(-1); -// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam)); -// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); -// } - /** * 平台对设备发起主题询问命令 */