From 6d7000ddc2a45c1198dda762c480e608778e0c18 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Mon, 21 Oct 2024 20:00:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/utils/ChannelObjectUtil.java | 10 +++++ .../access/handler/MqttMessageHandler.java | 43 ++++++++++++++----- .../listener/RedisKeyExpirationListener.java | 11 ----- .../com/njcn/zlevent/api/WaveFeignClient.java | 4 ++ .../fallback/WaveClientFallbackFactory.java | 7 ++- .../zlevent/controller/WaveController.java | 15 +++++-- .../impl/CsWaveAnalysisServiceImpl.java | 13 +++++- 7 files changed, 76 insertions(+), 27 deletions(-) diff --git a/iot-access/access-api/src/main/java/com/njcn/access/utils/ChannelObjectUtil.java b/iot-access/access-api/src/main/java/com/njcn/access/utils/ChannelObjectUtil.java index c17d7dd..9bcb4dc 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/utils/ChannelObjectUtil.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/utils/ChannelObjectUtil.java @@ -1,7 +1,10 @@ package com.njcn.access.utils; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; @@ -11,6 +14,9 @@ import java.util.List; @Component public class ChannelObjectUtil { + @Resource + private RedisUtil redisUtil; + /** * 将list转成对应实体 * @param object @@ -42,4 +48,8 @@ public class ChannelObjectUtil { // 或者抛出异常,根据您的需求 return null; } + + public Object getDeviceMid(String nDid) { + return redisUtil.getObjectByKey(AppRedisKey.DEVICE_MID + nDid); + } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 44bfe7a..d1f431b 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -27,6 +27,7 @@ import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsLineModelService; import com.njcn.access.service.ICsTopicService; +import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.*; @@ -43,7 +44,8 @@ import com.njcn.mq.template.AppFileMessageTemplate; import com.njcn.mq.template.AppFileStreamMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; -import com.njcn.web.utils.RequestUtil; +import com.njcn.zlevent.api.WaveFeignClient; +import com.njcn.zlevent.pojo.dto.WaveTimeDto; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -93,6 +95,8 @@ public class MqttMessageHandler { private final DevCapacityFeignClient devCapacityFeignClient; private final EquipmentFeignClient equipmentFeignClient; private final OverlimitMapper overlimitMapper; + private final ChannelObjectUtil channelObjectUtil; + private final WaveFeignClient waveFeignClient; @Autowired Validator validator; @@ -667,15 +671,34 @@ public class MqttMessageHandler { switch (fileDto.getType()){ case 4657: log.info("获取文件信息" + fileDto); - String key = AppRedisKey.PROJECT_INFO + nDid; - if (Objects.isNull(fileDto.getMsg().getType())) { - handleDefaultCase(fileDto, nDid); - } else { - if (Objects.equals("dir", fileDto.getMsg().getType())) { - saveDirectoryInfo(fileDto.getMsg().getDirInfo(), key); - } else if (Objects.equals("file", fileDto.getMsg().getType())){ - saveFileInfo(fileDto.getMsg().getFileInfo(), key); - appFileMessageTemplate.sendMember(appFileMessage); + if (Objects.equals(fileDto.getCode(),AccessEnum.SUCCESS.getCode())) { + String key = AppRedisKey.PROJECT_INFO + nDid; + if (Objects.isNull(fileDto.getMsg().getType())) { + handleDefaultCase(fileDto, nDid); + } else { + if (Objects.equals("dir", fileDto.getMsg().getType())) { + saveDirectoryInfo(fileDto.getMsg().getDirInfo(), key); + } else if (Objects.equals("file", fileDto.getMsg().getType())){ + saveFileInfo(fileDto.getMsg().getFileInfo(), key); + appFileMessageTemplate.sendMember(appFileMessage); + } + } + } else if (Objects.equals(fileDto.getCode(),AccessEnum.NOT_FIND.getCode())) { + Object object = redisUtil.getObjectByKey("fileMid:" + nDid); + if (Objects.nonNull(object)) { + String data = redisUtil.getObjectByKey("fileMid:" + nDid).toString(); + String [] arr = data.split("concat"); + Integer mid = Integer.parseInt(arr[0]); + String fileName = arr[1]; + if (Objects.equals(mid,fileDto.getMid())) { + List list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); + list.removeIf(item -> item.getFileName().equals(fileName)); + redisUtil.saveByKey("eventFile:" + nDid, list); + if (CollectionUtil.isNotEmpty(list)) { + redisUtil.delete("handleEvent:" + nDid); + waveFeignClient.channelWave(nDid); + } + } } } break; diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 5f141fc..ee02ecf 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -105,17 +105,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); executeMainTask(scheduler,nDid,version); } -// //自动接入 -// else if (expiredKey.startsWith("autoAccess")) { -// List list = csEquipmentDeliveryService.getAll(); -// list.forEach(item->{ -// String version = csTopicService.getVersion(item.getNdid()); -// if (!Objects.isNull(version)){ -// csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); -// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); -// } -// }); -// } } //主任务 diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java index ac8a313..f59487a 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java @@ -6,6 +6,7 @@ import com.njcn.mq.message.AppEventMessage; import com.njcn.zlevent.api.fallback.WaveClientFallbackFactory; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; /** * @author xy @@ -15,4 +16,7 @@ public interface WaveFeignClient { @PostMapping("/analysis") HttpResult analysis(AppEventMessage appEventMessage); + + @PostMapping("/channelWave") + HttpResult channelWave(@RequestParam("nDid") String nDid); } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java index dcd6b8a..171f307 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java @@ -4,7 +4,6 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppEventMessage; -import com.njcn.zlevent.api.EventFeignClient; import com.njcn.zlevent.api.WaveFeignClient; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; @@ -31,6 +30,12 @@ public class WaveClientFallbackFactory implements FallbackFactory channelWave(String nDid) { + log.error("{}异常,降级处理,异常为:{}","处理录波事件",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java index d6b41ee..d572f57 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java @@ -13,10 +13,7 @@ import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; /** * 类的介绍: @@ -44,4 +41,14 @@ public class WaveController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/channelWave") + @ApiOperation("处理录波事件") + @ApiImplicitParam(name = "nDid", value = "nDid", required = true) + public HttpResult channelWave(@RequestParam("nDid") String nDid){ + String methodDescribe = getMethodDescribe("channelWave"); + csWaveService.channelWave(nDid); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java index 795ec77..87e40bc 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java @@ -54,6 +54,7 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { private final DicDataFeignClient dicDataFeignClient; private final ChannelObjectUtil channelObjectUtil; private final MqttUtil mqttUtil; + private static Integer mid = 1; @Override public void analysis(AppEventMessage appEventMessage) { @@ -112,8 +113,18 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { if (Objects.isNull(obj)) { List list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); if (CollectionUtil.isNotEmpty(list)) { + Object object = channelObjectUtil.getDeviceMid(nDid); + if (!Objects.isNull(object)) { + mid = (Integer) object; + } WaveTimeDto dto = list.get(0); - askFileInfo(nDid,1,dto.getFileName()); + redisUtil.saveByKeyWithExpire("fileMid:" + nDid,mid + "concat" +dto.getFileName(),60L); + askFileInfo(nDid,mid,dto.getFileName()); + mid = mid + 1; + if (mid > 10000) { + mid = 1; + } + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); } } else { throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING);