diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 3615f3d..3caa753 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -60,39 +60,22 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private static final Logger logger = LoggerFactory.getLogger(CsDeviceServiceImpl.class); private final EquipmentFeignClient equipmentFeignClient; - private final ICsEquipmentDeliveryService csEquipmentDeliveryService; - private final DictTreeFeignClient dictTreeFeignClient; - private final ICsLedgerService csLedgerService; - private final ICsDevModelRelationService csDevModelRelationService; - private final ICsLineService csLineService; - private final IAppLineTopologyDiagramService appLineTopologyDiagramService; - private final ICsDeviceUserService csDeviceUserService; - private final MqttPublisher publisher; - private final RedisUtil redisUtil; - private final MqttUtil mqttUtil; - private final ICsTopicService csTopicService; - private final DicDataFeignClient dicDataFeignClient; - private final CsLogsFeignClient csLogsFeignClient; - private final ProcessFeignClient processFeignClient; - private final CsLinePOService csLinePOService; - private final CsDeviceUserPOService csDeviceUserPOService; - private final ICsDataSetService csDataSetService; @Override diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java index e787585..a46522b 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java @@ -1,41 +1,26 @@ package com.njcn.zlevent.init; -import cn.hutool.core.collection.CollectionUtil; -import com.njcn.csdevice.api.EquipmentFeignClient; -import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.redis.utils.RedisUtil; -import com.njcn.zlevent.service.ICsWaveAnalysisService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; -import java.util.List; - /** * @author xy * - * 程序重启清除事件标识,开始消费事件 + * 程序重启设置任务,消费历史录波文件 */ @Slf4j @Component @AllArgsConstructor public class InitEventFiles implements CommandLineRunner { - private final ICsWaveAnalysisService csWaveAnalysisService; private final RedisUtil redisUtil; - private final EquipmentFeignClient equipmentFeignClient; @Override public void run(String... args) { - List list = equipmentFeignClient.getAll().getData(); - if (CollectionUtil.isNotEmpty(list)) { - list.forEach(item->{ - redisUtil.delete("handleEvent:" + item.getNdid()); - //处理缓存数据 - csWaveAnalysisService.channelWave(item.getNdid()); - }); - } + redisUtil.saveByKeyWithExpire("startFile",null,30L); } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java index 2759a95..e1c5c23 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java @@ -11,7 +11,9 @@ import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.enums.AlgorithmResponseEnum; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.stat.enums.StatResponseEnum; @@ -56,6 +58,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private ICsWaveAnalysisService iCsWaveAnalysisService; @Resource private RemoveInfoUtils removeInfoUtils; + @Resource + private ICsWaveAnalysisService csWaveAnalysisService; + @Resource + private EquipmentFeignClient equipmentFeignClient; private static Integer mid = 1; private static Integer range = 51200; @@ -101,6 +107,17 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene downloadFile(missingList,dto.getNDid(),fileName); } } + //项目重启之后,经过10s开始处理历史录波文件 + else if (expiredKey.startsWith("startFile")) { + List list = equipmentFeignClient.getAll().getData(); + if (CollectionUtil.isNotEmpty(list)) { + list.forEach(item->{ + redisUtil.delete("handleEvent:" + item.getNdid()); + //处理缓存数据 + csWaveAnalysisService.channelWave(item.getNdid()); + }); + } + } } //请求缺失的数据 @@ -156,7 +173,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene */ public void sendNextStep(String fileName, String id, int mid,int step) { try { - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= 11; i++) { if (step == 0 ){ Thread.sleep(5000 * i); } else { @@ -175,7 +192,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene //尝试失败则设置code为400,如果装置响应了,则会将code置为200 FileRedisDto failDto = new FileRedisDto(); failDto.setCode(400); - redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,2L*i); + redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,4L*i); ReqAndResDto.Req req = getPojo(mid,fileName,step); publisher.send("/Pfm/DevFileCmd/V1/" + id, new Gson().toJson(req), 1, false); } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java index d073716..795ec77 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java @@ -8,6 +8,7 @@ import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.utils.ChannelObjectUtil; +import com.njcn.access.utils.MqttUtil; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient; @@ -46,18 +47,13 @@ import java.util.stream.Collectors; public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { private final EquipmentFeignClient equipmentFeignClient; - private final MqttPublisher publisher; - private final CsTopicFeignClient csTopicFeignClient; - private final RedisUtil redisUtil; - private final CsLineFeignClient csLineFeignClient; - private final DicDataFeignClient dicDataFeignClient; - private final ChannelObjectUtil channelObjectUtil; + private final MqttUtil mqttUtil; @Override public void analysis(AppEventMessage appEventMessage) { @@ -107,16 +103,21 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { @Override @Async("asyncExecutor") public void channelWave(String nDid) { - //有相同文件处理时,则不进行数据处理 - Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid); - if (Objects.isNull(obj)) { - List list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); - if (CollectionUtil.isNotEmpty(list)) { - WaveTimeDto dto = list.get(0); - askFileInfo(nDid,1,dto.getFileName()); + //判断客户端是否在线,在线再处理文件 + String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); + if (mqttClient){ + //有相同文件处理时,则不进行数据处理 + Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid); + if (Objects.isNull(obj)) { + List list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); + if (CollectionUtil.isNotEmpty(list)) { + WaveTimeDto dto = list.get(0); + askFileInfo(nDid,1,dto.getFileName()); + } + } else { + throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING); } - } else { - throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING); } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java index b940de3..2033128 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java @@ -50,15 +50,15 @@ public class RemoveInfoUtils { // 增加重试次数并保存 redisUtil.saveByKey(("retryEvent:" + nDid + fileName), retryTimes + 1); // 重排文件列表 - updateFileList(nDid, fileName); + //updateFileList(nDid, fileName); } else { - // 仅从列表中移除文件 + // 从列表中移除文件 removeFileFromList(nDid, fileName); - // 检查是否还有其他文件需要处理 - List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); - if (CollectionUtil.isNotEmpty(fileDto)) { - iCsWaveAnalysisService.channelWave(nDid); - } + } + // 检查是否还有其他文件需要处理 + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); + if (CollectionUtil.isNotEmpty(fileDto)) { + iCsWaveAnalysisService.channelWave(nDid); } } @@ -84,6 +84,7 @@ public class RemoveInfoUtils { List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); fileDto.removeIf(item -> item.getFileName().equals(fileName)); redisUtil.saveByKey("eventFile:" + nDid, fileDto); + redisUtil.delete(AppRedisKey.FILE_PART.concat(fileName)); } }