From 1d378e0c03a71dbb8e793b15d18dfda719fbdd48 Mon Sep 17 00:00:00 2001 From: xuyang <748613696@qq.com> Date: Mon, 28 Aug 2023 16:19:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/AccessBootApplication.java | 2 + .../controller/CsDevModelController.java | 14 +--- .../access/controller/CsDeviceController.java | 3 - .../access/handler/MqttMessageHandler.java | 74 ++++++++++++++++++- .../listener/RedisKeyExpirationListener.java | 30 +++++--- .../runner/AccessApplicationRunner.java | 47 ++++++++++++ .../access/runner/AccessScheduledTask.java | 50 +++++++++++++ .../service/ICsEquipmentDeliveryService.java | 8 ++ .../service/impl/CsDevModelServiceImpl.java | 41 ++++++++++ .../service/impl/CsDeviceServiceImpl.java | 67 ++++++++++++++++- .../impl/CsEquipmentDeliveryServiceImpl.java | 6 ++ .../service/impl/CsTopicServiceImpl.java | 8 +- 12 files changed, 321 insertions(+), 29 deletions(-) create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/AccessBootApplication.java b/iot-access/access-boot/src/main/java/com/njcn/access/AccessBootApplication.java index aea9d0b..5ce8fed 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/AccessBootApplication.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/AccessBootApplication.java @@ -5,6 +5,7 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; +import org.springframework.scheduling.annotation.EnableScheduling; /** @@ -16,6 +17,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; @MapperScan("com.njcn.**.mapper") @EnableFeignClients(basePackages = "com.njcn") @SpringBootApplication(scanBasePackages = "com.njcn") +@EnableScheduling public class AccessBootApplication { public static void main(String[] args) { diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java index 0f55b40..241ea45 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java @@ -1,29 +1,24 @@ package com.njcn.access.controller; -import cn.hutool.core.collection.CollectionUtil; -import com.njcn.access.pojo.param.DevModelParam; import com.njcn.access.service.ICsDevModelService; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; -import com.njcn.csdevice.api.CsGroupFeignClient; -import com.njcn.csdevice.enums.DeviceOperate; -import com.njcn.web.advice.DeviceLog; import com.njcn.web.controller.BaseController; import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestPart; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; -import java.util.List; - /** * 类的介绍: * @@ -44,7 +39,6 @@ public class CsDevModelController extends BaseController { @PostMapping("/addModel") @ApiOperation("新增设备模板") @Transactional(rollbackFor = {Exception.class}) - @DeviceLog(operateType = DeviceOperate.ADD_MODEL) public HttpResult addModel(@RequestPart("file") @Validated MultipartFile file){ String methodDescribe = getMethodDescribe("addModel"); //1.录入通用字典 diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java index 213806a..f558cc4 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java @@ -40,7 +40,6 @@ public class CsDeviceController extends BaseController { @PostMapping("/register") @ApiOperation("直连设备状态判断") @ApiImplicitParam(name = "nDid", value = "设备识别码", required = true) - @DeviceLog(operateType = DeviceOperate.JUDGE_ONLINE) @ReturnMsg public HttpResult devRegister(@RequestParam String nDid){ csDeviceService.devRegister(nDid); @@ -51,7 +50,6 @@ public class CsDeviceController extends BaseController { @PostMapping("/model") @ApiOperation("获取直连设备模板信息") @ApiImplicitParam(name = "nDid", value = "设备识别码", required = true) - @DeviceLog(operateType = DeviceOperate.MODEL) @ReturnMsg public HttpResult getModel(@RequestParam String nDid){ String methodDescribe = getMethodDescribe("getModel"); @@ -63,7 +61,6 @@ public class CsDeviceController extends BaseController { @PostMapping("/access") @ApiOperation("直连设备注册") @ApiImplicitParam(name = "devAccessParam", value = "接入参数", required = true) - @DeviceLog(operateType = DeviceOperate.DEVICE_REGISTER) public HttpResult devAccess(@RequestBody @Validated DevAccessParam devAccessParam){ String methodDescribe = getMethodDescribe("getModel"); csDeviceService.devAccess(devAccessParam); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 002bc35..057a270 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -20,18 +20,20 @@ import com.njcn.access.pojo.po.CsTopic; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsLineModelService; import com.njcn.access.service.ICsTopicService; +import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.CsLogsFeignClient; import com.njcn.csdevice.api.DataSetFeignClient; import com.njcn.csdevice.api.DevModelFeignClient; import com.njcn.csdevice.pojo.po.CsDataSet; import com.njcn.csdevice.pojo.po.CsDevModelPO; -import com.njcn.mq.constant.BusinessTopic; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.template.AppAutoDataMessageTemplate; import com.njcn.mq.template.AppEventMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; +import com.njcn.web.utils.RequestUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -74,12 +76,22 @@ public class MqttMessageHandler { private final AppEventMessageTemplate appEventMessageTemplate; + private final CsLogsFeignClient csLogsFeignClient; + @Autowired Validator validator; @MqttSubscribe(value = "/Dev/DevTopic/{edgeId}",qos = 1) @Transactional(rollbackFor = Exception.class) public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){ + //日志记录 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("装置主题录入"); + logDto.setResult(1); + //业务流程开始 Gson gson = new Gson(); ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class); //检验传递的参数是否准确 @@ -103,10 +115,17 @@ public class MqttMessageHandler { list.add(csTopic); }); csTopicService.addTopic(nDid,list); + csLogsFeignClient.addUserLog(logDto); } else { + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); } } else { + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.RESPONSE_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage()); } } @@ -123,6 +142,14 @@ public class MqttMessageHandler { @Transactional(rollbackFor = Exception.class) public void devOperation(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){ log.info("收到注册应答响应--->" + nDid); + //日志记录 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("收到设备注册应答响应"); + logDto.setResult(1); + //业务处理 Gson gson = new Gson(); ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ @@ -136,10 +163,17 @@ public class MqttMessageHandler { reqAndResParam.setExpire(-1); String version = csTopicService.getVersion(nDid); publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); + csLogsFeignClient.addUserLog(logDto); } else { + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); } } else { + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); log.info(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage()); } } @@ -157,6 +191,14 @@ public class MqttMessageHandler { @MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1) @Transactional(rollbackFor = Exception.class) public void devModelOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){ + //日志记录 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("装置类型模板应答"); + logDto.setResult(1); + //业务处理 Gson gson = new Gson(); ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class); if (Objects.equals(modelDto.getType(),Integer.parseInt(TypeEnum.TYPE_18.getCode()))){ @@ -164,6 +206,9 @@ public class MqttMessageHandler { List list2 = modelDto.getMsg().getDevCfg(); if (CollectionUtils.isEmpty(list)){ log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR); } //校验前置传递的装置模板库中是否存在 @@ -179,11 +224,17 @@ public class MqttMessageHandler { CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData(); if (Objects.isNull(po)){ log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODEL_NO_FIND.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); } if (Objects.equals(po.getType(),0)){ List dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData(); if (CollectionUtils.isEmpty(dataSetList)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); } csModelDto.setModuleNumber(dataSetList.size()); @@ -202,6 +253,7 @@ public class MqttMessageHandler { List lineList = csLineModelService.getMonitorNumByModelId(modelId); String key = AppRedisKey.LINE + nDid; redisUtil.saveByKeyWithExpire(key,lineList,600L); + csLogsFeignClient.addUserLog(logDto); } } @@ -216,20 +268,33 @@ public class MqttMessageHandler { @MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1) @Transactional(rollbackFor = Exception.class) public void devAccessOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){ + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setResult(1); + //业务处理 Gson gson = new Gson(); ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class); switch (res.getType()){ case 4613: + logDto.setOperate("设备接入"); log.info("收到接入应答响应--->" + nDid); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ + log.info("接入应答成功--->" + nDid); //修改装置状态 - csEquipmentDeliveryService.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode()); csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode()); } else { log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(AccessResponseEnum.ACCESS_RESPONSE_ERROR); } break; case 4614: + logDto.setOperate("设备数据应答"); log.info("设备数据应答--->" + nDid); RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class); switch (rspDataDto.getDataType()){ @@ -249,6 +314,7 @@ public class MqttMessageHandler { default: break; } + csLogsFeignClient.addUserLog(logDto); } @@ -271,6 +337,8 @@ public class MqttMessageHandler { case 4865: //设置心跳时间,超时改为掉线 redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L); + //装置改成在线 + csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode()); //处理心跳 ReqAndResDto.Res reqAndResParam = new ReqAndResDto.Res(); HeartBeatDto heartBeatDto = new HeartBeatDto(); @@ -282,8 +350,6 @@ public class MqttMessageHandler { reqAndResParam.setCode(200); reqAndResParam.setMsg(heartBeatDto); publisher.send("/Dev/DataRsp/"+version+"/"+nDid,gson.toJson(reqAndResParam),1,false); - //装置改成在线 - csEquipmentDeliveryService.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode()); //处理业务逻辑 Object object = res.getMsg(); if (!Objects.isNull(object)){ diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 0aeed04..a0c30c5 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -1,35 +1,31 @@ package com.njcn.access.listener; import com.njcn.access.enums.AccessEnum; -import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; -import com.njcn.access.utils.MqttUtil; +import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.api.CsLogsFeignClient; import com.njcn.csdevice.enums.DeviceOperate; -import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; -import com.njcn.redis.utils.RedisUtil; import com.njcn.web.advice.DeviceLog; +import com.njcn.web.utils.RequestUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.Objects; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; /** - * @author hongawen + * @author xy * @version 1.0.0 * @date 2022年04月02日 14:31 */ @@ -46,29 +42,39 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene @Resource private CsDeviceServiceImpl csDeviceService; + @Resource + private CsLogsFeignClient csLogsFeignClient; + public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } - /** * 针对redis数据失效事件,进行数据处理 * 注意message.toString()可以获取失效的key */ @Override - @DeviceLog(operateType = DeviceOperate.DEVICE_OFFLINE) public void onMessage(Message message, byte[] pattern) { if (StringUtils.isBlank(message.toString())) { return; } + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setResult(1); //判断失效的key是否为MQTT消费端存入的 String expiredKey = message.toString(); if(expiredKey.startsWith("MQTT:")){ String nDid = expiredKey.split(":")[1]; //装置下线 csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); + logDto.setOperate("装置掉线,装置为:" + nDid); + csLogsFeignClient.addUserLog(logDto); //立马发起接入请求 String version = csTopicService.getVersion(nDid); + log.info("装置掉线立马发送接入请求,接入失败则进入定时接入任务"); csDeviceService.devAccess(nDid,version); //接入再次失败,则定时发起接入请求 try { @@ -77,11 +83,15 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); ScheduledFuture runnableFuture = executor.scheduleAtFixedRate(() -> { + log.info("定时发送接入请求..."); csDeviceService.devAccess(nDid, version); Integer status2 = csEquipmentDeliveryService.queryEquipmentByndid(nDid).getRunStatus(); if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){ throw new BusinessException(CommonResponseEnum.SUCCESS); } + //记录日志 + logDto.setOperate("装置掉线,定时发送接入请求,装置为:" + nDid + ",请求的时间戳为:" + System.currentTimeMillis()); + csLogsFeignClient.addUserLog(logDto); }, 1, 3600, TimeUnit.SECONDS); } } catch (InterruptedException e) { diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java new file mode 100644 index 0000000..fc726b4 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java @@ -0,0 +1,47 @@ +package com.njcn.access.runner; + +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.access.service.ICsTopicService; +import com.njcn.access.service.impl.CsDeviceServiceImpl; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Objects; + +/** + * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入 + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/28 13:57 + */ +@Component +@Slf4j +public class AccessApplicationRunner implements ApplicationRunner { + + @Resource + private CsDeviceServiceImpl csDeviceService; + + @Resource + private ICsTopicService csTopicService; + + @Resource + private ICsEquipmentDeliveryService csEquipmentDeliveryService; + + @Override + public void run(ApplicationArguments args){ + List list = csEquipmentDeliveryService.getAll(); + list.forEach(item->{ + String version = csTopicService.getVersion(item.getNdid()); + if (!Objects.isNull(version)){ + csDeviceService.devAccess(item.getNdid(),version); + } + }); + } + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java new file mode 100644 index 0000000..854d7e0 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessScheduledTask.java @@ -0,0 +1,50 @@ +package com.njcn.access.runner; + +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.access.service.ICsTopicService; +import com.njcn.access.service.impl.CsDeviceServiceImpl; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Objects; + +/** + * 类的介绍:防止设备掉线 系统未能调整,做一个定时任务,每天凌晨将所有设备重新接入 + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/28 14:21 + */ +@Component +@Slf4j +public class AccessScheduledTask { + + @Resource + private CsDeviceServiceImpl csDeviceService; + + @Resource + private ICsTopicService csTopicService; + + @Resource + private ICsEquipmentDeliveryService csEquipmentDeliveryService; + + /** + * {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)} + */ + @Scheduled(cron = "0 0 0 * * ?") + public void executeTask() { + log.info("每日凌晨定时任务执行"); + List list = csEquipmentDeliveryService.getAll(); + list.forEach(item->{ + String version = csTopicService.getVersion(item.getNdid()); + if (!Objects.isNull(version)){ + csDeviceService.devAccess(item.getNdid(),version); + } + }); + } + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java index e106450..2b58745 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java @@ -12,6 +12,8 @@ import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; import com.njcn.csdevice.pojo.vo.DeviceManagerVO; import com.njcn.csdevice.pojo.vo.ProjectEquipmentVO; +import java.util.List; + /** * * Description: @@ -47,4 +49,10 @@ public interface ICsEquipmentDeliveryService extends IService getAll(); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index 99eff06..9d32921 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -11,7 +11,9 @@ import com.njcn.access.pojo.dto.devModel.*; import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.service.*; import com.njcn.access.utils.JsonUtil; +import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.CsLogsFeignClient; import com.njcn.csdevice.api.DevModelFeignClient; import com.njcn.csdevice.pojo.po.*; import com.njcn.oss.constant.OssPath; @@ -23,6 +25,7 @@ import com.njcn.system.enums.DicDataEnum; import com.njcn.system.pojo.param.EleEpdPqdParam; import com.njcn.system.pojo.param.EleEvtParam; import com.njcn.system.pojo.po.EleEpdPqd; +import com.njcn.web.utils.RequestUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -69,14 +72,23 @@ public class CsDevModelServiceImpl implements ICsDevModelService { private final ICsGroArrService csGroArrService; + private final CsLogsFeignClient csLogsFeignClient; + @Override @Transactional(rollbackFor = {Exception.class}) public void addModel(MultipartFile file) { + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setResult(1); String json = null; try { json = JsonUtil.convertStreamToString(file.getInputStream()); Gson gson = new Gson(); TemplateDto templateDto = gson.fromJson(json, TemplateDto.class); + logDto.setOperate("新增设备模板,模板名称:" + templateDto.getDevType()); //模板文件存入文件服务器 String filePath = fileStorageUtil.uploadMultipart(file, OssPath.DEV_MODEL + templateDto.getDevType() + "_"); //1.录入cs_dev_model表,记录装置型号和模板记录 @@ -85,7 +97,11 @@ public class CsDevModelServiceImpl implements ICsDevModelService { analysisDataSet(templateDto,csDevModelPo.getId()); //3.录入监测点模板表(记录当前模板有几个监测点,治理类型的模板目前规定1个监测点,电能质量模板根据逻辑子设备来) addCsLineModel(templateDto,csDevModelPo.getId()); + csLogsFeignClient.addUserLog(logDto); } catch (Exception e) { + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODEL_ANALYSIS_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MODEL_ANALYSIS_ERROR); } } @@ -93,13 +109,24 @@ public class CsDevModelServiceImpl implements ICsDevModelService { @Override @Transactional(rollbackFor = {Exception.class}) public void addDict(MultipartFile file) { + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("录入通用字典"); + logDto.setResult(1); String json = null; try { json = JsonUtil.convertStreamToString(file.getInputStream()); Gson gson = new Gson(); TemplateDto templateDto = gson.fromJson(json, TemplateDto.class); analysisDict(templateDto); + csLogsFeignClient.addUserLog(logDto); } catch (Exception e) { + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.DICT_ANALYSIS_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.DICT_ANALYSIS_ERROR); } } @@ -108,8 +135,18 @@ public class CsDevModelServiceImpl implements ICsDevModelService { * 新增cs_dev_model数据 */ private CsDevModelPO addCsDevModel(TemplateDto templateDto, String filePath){ + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("新增cs_dev_model数据"); + logDto.setResult(1); CsDevModelPO po = devModelFeignClient.findModel(templateDto.getDevType(),templateDto.getVersion(),templateDto.getTime()).getData(); if (!Objects.isNull(po)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODEL_REPEAT.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MODEL_REPEAT); } CsDevModelPO model = new CsDevModelPO(); @@ -126,6 +163,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { model.setType(1); } csDevModelMapper.insert(model); + csLogsFeignClient.addUserLog(logDto); return model; } @@ -197,6 +235,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { EleEpdPqd po = epdFeignClient.add(eleEpdPqdParam).getData(); if (CollectionUtil.isNotEmpty(evt.getParam())){ evt.getParam().forEach(param->{ + //录入关系表 EleEvtParam eleEvtParam = new EleEvtParam(); eleEvtParam.setPid(po.getId()); eleEvtParam.setData(param.getData()); @@ -205,6 +244,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService { eleEvtParam.setType(param.getType()); eleEvtParam.setUnit(param.getUnit()); eleEvtFeignClient.add(eleEvtParam); + //录入字典表 + }); } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 411a79a..d432847 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -18,6 +18,7 @@ import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.po.CsSoftInfoPO; import com.njcn.access.service.*; import com.njcn.access.utils.MqttUtil; +import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; @@ -27,6 +28,7 @@ import com.njcn.csdevice.pojo.param.CsLedgerParam; import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO; import com.njcn.csdevice.pojo.po.CsDeviceUserPO; import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.csdevice.pojo.po.CsLogsPO; import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; @@ -90,27 +92,48 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private final ICsSoftInfoService csSoftInfoService; + private final CsLogsFeignClient csLogsFeignClient; + @Override @Transactional(rollbackFor = {Exception.class}) public void devRegister(String nDid) { + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("直连设备状态判断"); + logDto.setResult(1); //1.判断nDid是否存在 CsEquipmentDeliveryVO csEquipmentDeliveryVO = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); if (Objects.isNull(csEquipmentDeliveryVO.getNdid())){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.NDID_NO_FIND.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.NDID_NO_FIND); } //2.判断设备是否是直连设备 SysDicTreePO sysDicTreePo = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData(); if (Objects.isNull(sysDicTreePo)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.DEV_NOT_FIND.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.DEV_NOT_FIND); } String code = sysDicTreePo.getCode(); if (!Objects.equals(code, DicDataEnum.CONNECT_DEV.getCode())){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.DEV_IS_NOT_ZL.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.DEV_IS_NOT_ZL); } //3.判断客户端是否在线 String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); boolean mqttClient = mqttUtil.judgeClientOnline(clientName); if (!mqttClient){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MISSING_CLIENT.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MISSING_CLIENT); } //4.询问设备支持的主题信息 @@ -120,32 +143,55 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //存在则建立关系;不存在则告警出来 SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData(); if (Objects.isNull(dictData)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.DEV_MODEL_NOT_FIND.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.DEV_MODEL_NOT_FIND); } String devModel = dictData.getCode(); zhiLianRegister(nDid,devModel); + csLogsFeignClient.addUserLog(logDto); } @Override public Object getModel(String nDid) { + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("获取直连设备模板信息"); + logDto.setResult(1); Object model = null; try { Thread.sleep(1500); String key = AppRedisKey.LINE + nDid; model = redisUtil.getObjectByKey(key); if (Objects.isNull(model)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODEL_MISS.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MODEL_MISS); } } catch (InterruptedException e) { e.printStackTrace(); } + csLogsFeignClient.addUserLog(logDto); return model; } @Override @Transactional(rollbackFor = {Exception.class}) public void devAccess(DevAccessParam devAccessParam) { + //日志实体 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserIndex(RequestUtil.getUserIndex()); + logDto.setUserName(RequestUtil.getUsername()); + logDto.setLoginName(RequestUtil.getLoginName()); + logDto.setOperate("直连设备注册"); + logDto.setResult(1); try { + //获取版本 String version = csTopicService.getVersion(devAccessParam.getNDid()); CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(devAccessParam.getNDid()).getData(); List csLinePoList = new ArrayList<>(); @@ -175,9 +221,15 @@ public class CsDeviceServiceImpl implements ICsDeviceService { } } if (Objects.isNull(clDid)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.CLDID_IS_NULL.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.CLDID_IS_NULL); } if (Objects.isNull(moduleNumber)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); } askDevData(devAccessParam.getNDid(),AccessEnum.L_DEV_INFO.getCode(),version,clDid); @@ -187,6 +239,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService { String key = AppRedisKey.LINE_DATA + devAccessParam.getNDid(); list = objectToList2(redisUtil.getObjectByKey(key)); if (CollectionUtils.isEmpty(list)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.LDEVINFO_IS_NULL.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.LDEVINFO_IS_NULL); } //3.监测点表录入关系 @@ -233,6 +288,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService { List position = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList()); List lineList = position.stream().filter(e-> Collections.frequency(position,e) > 1).distinct().collect(Collectors.toList()); if (CollectionUtil.isNotEmpty(lineList)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.LINE_POSITION_REPEAT.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.LINE_POSITION_REPEAT); } csLineService.saveBatch(csLinePoList); @@ -252,6 +310,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService { String key2 = AppRedisKey.SOFTINFO + devAccessParam.getNDid(); RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(redisUtil.getObjectByKey(key2)), RspDataDto.SoftInfo.class); if (Objects.isNull(softInfo)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.SOFTINFO_IS_NULL.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.SOFTINFO_IS_NULL); } //记录设备软件信息 @@ -270,12 +331,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService { redisUtil.delete(AppRedisKey.LINE + devAccessParam.getNDid()); redisUtil.delete(AppRedisKey.LINE_DATA + devAccessParam.getNDid()); redisUtil.delete(AppRedisKey.SOFTINFO + devAccessParam.getNDid()); + //存储日志 + csLogsFeignClient.addUserLog(logDto); } catch (Exception e) { + logDto.setResult(0); + logDto.setFailReason(CommonResponseEnum.FAIL.getMessage()); + csLogsFeignClient.addUserLog(logDto); throw new BusinessException(CommonResponseEnum.FAIL); } } - public void devAccess(String nDid,String version) { ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); reqAndResParam.setMid(1); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index b22ec5d..c5f8dc1 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -11,6 +11,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import java.util.List; import java.util.Objects; /** @@ -58,4 +59,9 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl getAll() { + return this.lambdaQuery().ne(CsEquipmentDeliveryPO::getRunStatus,0).list(); + } + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java index 15a3785..5178b32 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java @@ -7,6 +7,7 @@ import com.njcn.access.mapper.CsTopicMapper; import com.njcn.access.pojo.po.CsTopic; import com.njcn.access.service.ICsTopicService; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import java.util.List; @@ -34,6 +35,11 @@ public class CsTopicServiceImpl extends ServiceImpl impl @Override public String getVersion(String nDid) { - return this.lambdaQuery().eq(CsTopic::getNDid,nDid).isNotNull(CsTopic::getVersion).list().get(0).getVersion(); + String version = null; + List list = this.lambdaQuery().eq(CsTopic::getNDid,nDid).isNotNull(CsTopic::getVersion).list(); + if (CollectionUtil.isNotEmpty(list)){ + version = list.get(0).getVersion(); + } + return version; } }