From 90dc22021f2587495fd343e7ddd04b9878ee6e0a Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Fri, 16 Aug 2024 13:00:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A3=85=E7=BD=AE=E6=8E=A5=E5=85=A5=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/controller/CsDeviceController.java | 8 +- .../listener/RedisKeyExpirationListener.java | 10 +- .../runner/AccessApplicationRunner.java | 94 ++++++++-------- .../access/runner/AccessScheduledTask.java | 103 +++++++++--------- .../service/ICsDevModelRelationService.java | 5 + .../njcn/access/service/ICsDeviceService.java | 2 +- .../impl/CsDevModelRelationServiceImpl.java | 9 ++ .../service/impl/CsDeviceServiceImpl.java | 97 ++++++++++++----- 8 files changed, 195 insertions(+), 133 deletions(-) diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java index 3f368af..098d213 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java @@ -86,8 +86,12 @@ public class CsDeviceController extends BaseController { @ApiImplicitParam(name = "nDid", value = "设备id", required = true) public HttpResult manualAccess(@RequestParam String nDid){ String methodDescribe = getMethodDescribe("manualAccess"); - csDeviceService.manualAccess(nDid); - return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + boolean result = csDeviceService.manualAccess(nDid); + if (result) { + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "成功", methodDescribe); + } else { + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.FAIL, "失败", methodDescribe); + } } @OperateInfo(info = LogEnum.BUSINESS_COMMON) diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index d5831ec..1c2fad5 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -88,7 +88,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene try { String version = csTopicService.getVersion(nDid); //装置没有心跳,则立马发起接入请求 - csDeviceService.devAccess(nDid,version); + csDeviceService.devAccessAskTemplate(nDid,version,1); logDto.setOperate("装置掉线3分钟发送接入请求"); csLogsFeignClient.addUserLog(logDto); Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); @@ -96,10 +96,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene throw new BusinessException(CommonResponseEnum.SUCCESS); } //心跳断连立马发起接入失败后,1分钟再次发起请求,请求3次 - for (int i = 1; i < 4; i++) { + for (int i = 2; i < 5; i++) { //接入再次失败,则定时发起接入请求 Thread.sleep(1000 * 60); - csDeviceService.devAccess(nDid,version); + csDeviceService.devAccessAskTemplate(nDid,version,i); status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ break; @@ -108,10 +108,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene csLogsFeignClient.addUserLog(logDto); } if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){ - final int[] mid = {2}; + final int[] mid = {5}; ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); ScheduledFuture runnableFuture = executor.scheduleAtFixedRate(() -> { - csDeviceService.devAccessMid(nDid,version, mid[0]); + csDeviceService.devAccessAskTemplate(nDid,version, mid[0]); Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){ throw new BusinessException(CommonResponseEnum.SUCCESS); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java index fc726b4..260546c 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java @@ -1,47 +1,47 @@ -package com.njcn.access.runner; - -import com.njcn.access.service.ICsEquipmentDeliveryService; -import com.njcn.access.service.ICsTopicService; -import com.njcn.access.service.impl.CsDeviceServiceImpl; -import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.List; -import java.util.Objects; - -/** - * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入 - * - * @author xuyang - * @version 1.0.0 - * @createTime 2023/8/28 13:57 - */ -@Component -@Slf4j -public class AccessApplicationRunner implements ApplicationRunner { - - @Resource - private CsDeviceServiceImpl csDeviceService; - - @Resource - private ICsTopicService csTopicService; - - @Resource - private ICsEquipmentDeliveryService csEquipmentDeliveryService; - - @Override - public void run(ApplicationArguments args){ - List list = csEquipmentDeliveryService.getAll(); - list.forEach(item->{ - String version = csTopicService.getVersion(item.getNdid()); - if (!Objects.isNull(version)){ - csDeviceService.devAccess(item.getNdid(),version); - } - }); - } - -} +//package com.njcn.access.runner; +// +//import com.njcn.access.service.ICsEquipmentDeliveryService; +//import com.njcn.access.service.ICsTopicService; +//import com.njcn.access.service.impl.CsDeviceServiceImpl; +//import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.boot.ApplicationArguments; +//import org.springframework.boot.ApplicationRunner; +//import org.springframework.stereotype.Component; +// +//import javax.annotation.Resource; +//import java.util.List; +//import java.util.Objects; +// +///** +// * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入 +// * +// * @author xuyang +// * @version 1.0.0 +// * @createTime 2023/8/28 13:57 +// */ +//@Component +//@Slf4j +//public class AccessApplicationRunner implements ApplicationRunner { +// +// @Resource +// private CsDeviceServiceImpl csDeviceService; +// +// @Resource +// private ICsTopicService csTopicService; +// +// @Resource +// private ICsEquipmentDeliveryService csEquipmentDeliveryService; +// +// @Override +// public void run(ApplicationArguments args){ +// List list = csEquipmentDeliveryService.getAll(); +// list.forEach(item->{ +// String version = csTopicService.getVersion(item.getNdid()); +// if (!Objects.isNull(version)){ +// csDeviceService.devAccess(item.getNdid(),version,1); +// } +// }); +// } +// +//} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java index 854d7e0..10c62d0 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java @@ -1,50 +1,53 @@ -package com.njcn.access.runner; - -import com.njcn.access.service.ICsEquipmentDeliveryService; -import com.njcn.access.service.ICsTopicService; -import com.njcn.access.service.impl.CsDeviceServiceImpl; -import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.List; -import java.util.Objects; - -/** - * 类的介绍:防止设备掉线 系统未能调整,做一个定时任务,每天凌晨将所有设备重新接入 - * - * @author xuyang - * @version 1.0.0 - * @createTime 2023/8/28 14:21 - */ -@Component -@Slf4j -public class AccessScheduledTask { - - @Resource - private CsDeviceServiceImpl csDeviceService; - - @Resource - private ICsTopicService csTopicService; - - @Resource - private ICsEquipmentDeliveryService csEquipmentDeliveryService; - - /** - * {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)} - */ - @Scheduled(cron = "0 0 0 * * ?") - public void executeTask() { - log.info("每日凌晨定时任务执行"); - List list = csEquipmentDeliveryService.getAll(); - list.forEach(item->{ - String version = csTopicService.getVersion(item.getNdid()); - if (!Objects.isNull(version)){ - csDeviceService.devAccess(item.getNdid(),version); - } - }); - } - -} +//package com.njcn.access.runner; +// +//import cn.hutool.core.collection.CollUtil; +//import cn.hutool.core.collection.CollectionUtil; +//import com.njcn.access.service.ICsEquipmentDeliveryService; +//import com.njcn.access.service.ICsTopicService; +//import com.njcn.access.service.impl.CsDeviceServiceImpl; +//import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import javax.annotation.Resource; +//import java.util.List; +//import java.util.Objects; +// +///** +// * 类的介绍:防止设备掉线 系统未能调整,做一个定时任务,每天凌晨将所有设备重新接入 +// * +// * @author xuyang +// * @version 1.0.0 +// * @createTime 2023/8/28 14:21 +// */ +//@Component +//@Slf4j +//public class AccessScheduledTask { +// +// @Resource +// private CsDeviceServiceImpl csDeviceService; +// +// @Resource +// private ICsTopicService csTopicService; +// +// @Resource +// private ICsEquipmentDeliveryService csEquipmentDeliveryService; +// +// /** +// * {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)} +// */ +// @Scheduled(cron = "0 0 0 * * ?") +// public void executeTask() { +// log.info("每日凌晨定时任务执行"); +// List list = csEquipmentDeliveryService.getAll(); +// if (CollUtil.isNotEmpty(list)) { +// for (int i = 0; i < list.size(); i++) { +// String version = csTopicService.getVersion(list.get(i).getNdid()); +// if (!Objects.isNull(version)){ +// csDeviceService.devAccess(list.get(i).getNdid(),version,i); +// } +// } +// } +// } +//} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java index 64ccca3..2e776ed 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelRelationService.java @@ -27,5 +27,10 @@ public interface ICsDevModelRelationService extends IService wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(CsDevModelRelationPO::getDevId, deviceId) + .eq (CsDevModelRelationPO::getStatus, 1); + this.baseMapper.delete(wrapper); + } + public List queryDevModelRelation(CsDevModelRelationQueryParm queryParm) { QueryWrapper queryWrapper = new QueryWrapper<> (); queryWrapper.eq (StringUtils.isNotBlank (queryParm.getId ()),"id",queryParm.getId ()). diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 7d857c9..eafc145 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -41,10 +41,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; /** @@ -288,7 +285,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //6.修改装置状态 csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode()); //7.发起自动接入请求 - devAccess(devAccessParam.getNDid(),version); + devAccessAskTemplate(devAccessParam.getNDid(),version,1); //8.删除redis监测点模板信息 redisUtil.delete(AppRedisKey.MODEL + devAccessParam.getNDid()); @@ -360,9 +357,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService { } @Override - public void manualAccess(String nDid) { + @Transactional(rollbackFor = Exception.class) + public boolean manualAccess(String nDid) { String version = csTopicService.getVersion(nDid); - devAccess(nDid,version); + return devAccessAskTemplate(nDid,version,new Random().nextInt(10000)); } @@ -423,7 +421,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //3.修改装置状态为注册状态 csEquipmentDeliveryService.updateStatusBynDid(nDid, AccessEnum.REGISTERED.getCode()); //4.发起自动接入请求 - devAccess(nDid,version); + devAccessAskTemplate(nDid,version,1); //5.存储日志 csLogsFeignClient.addUserLog(logDto); //6.存储设备调试日志表 @@ -457,7 +455,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //获取版本 String version = csTopicService.getVersion(nDid); //发起接入请求 - this.devAccess(nDid,version); + this.devAccessAskTemplate(nDid,version,1); } private void checkDeviceStatus(String nDid) { @@ -511,27 +509,70 @@ public class CsDeviceServiceImpl implements ICsDeviceService { throw new BusinessException(responseEnum); } - public void devAccess(String nDid,String version) { - ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); - reqAndResParam.setMid(1); - reqAndResParam.setDid(0); - reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); - reqAndResParam.setExpire(-1); - logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam)); - publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); + /** + * 装置重新接入系统,需要校验所用的模板 + * @param nDid + * @param version + */ + @Transactional(rollbackFor = Exception.class) + public boolean devAccessAskTemplate(String nDid,String version,Integer mid) { + boolean result = false; + try { + //询问装置当前所用模板 + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(mid); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_3.getCode())); + reqAndResParam.setExpire(-1); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); + //接收到模板,判断模板是否存在,替换模板,发起接入 + Thread.sleep(2000); + List modelId = objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid)); + if (CollUtil.isNotEmpty(modelId)) { + CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); + //删除之前的模板信息 + csDevModelRelationService.deleteByDeviceId(vo.getId()); + //重新录入装置和模板关系信息 + for (CsModelDto item : modelId) { + CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm(); + csDevModelRelationAddParm.setDevId(vo.getId()); + csDevModelRelationAddParm.setModelId(item.getModelId()); + csDevModelRelationAddParm.setDid(item.getDid()); + csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm); + } + //发起接入 + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); + result = true; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return result; } - public void devAccessMid(String nDid,String version,Integer mid) { - ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); - reqAndResParam.setMid(mid); - reqAndResParam.setDid(0); - reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); - reqAndResParam.setExpire(-1); - logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam)); - publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); - } +// public void devAccess(String nDid,String version) { +// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); +// reqAndResParam.setMid(1); +// reqAndResParam.setDid(0); +// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); +// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); +// reqAndResParam.setExpire(-1); +// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam)); +// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); +// } +// +// public void devAccessMid(String nDid,String version,Integer mid) { +// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); +// reqAndResParam.setMid(mid); +// reqAndResParam.setDid(0); +// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); +// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); +// reqAndResParam.setExpire(-1); +// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam)); +// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); +// } /** * 平台对设备发起主题询问命令