From 848cc9c7de39d36035565758a142b6b688ae7013 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Sat, 14 Sep 2024 11:44:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BD=95=E6=B3=A2=E6=96=87=E4=BB=B6=E4=B8=8B?= =?UTF-8?q?=E8=BD=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/handler/MqttMessageHandler.java | 2 +- .../service/ICsEquipmentDeliveryService.java | 8 -- .../impl/AskDeviceDataServiceImpl.java | 2 +- .../njcn/zlevent/pojo/dto/WaveTimeDto.java | 8 ++ .../com/njcn/zlevent/init/InitEventFiles.java | 41 +++++++++ .../listener/RedisKeyExpirationListener.java | 35 +++----- .../impl/CsWaveAnalysisServiceImpl.java | 80 +++++++++-------- .../zlevent/service/impl/FileServiceImpl.java | 29 ++---- .../njcn/zlevent/utils/RemoveInfoUtils.java | 90 +++++++++++++++++++ 9 files changed, 205 insertions(+), 90 deletions(-) create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 6e6366a..1dd60e4 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -426,7 +426,7 @@ public class MqttMessageHandler { CsDevCapacityPO csDevCapacity = new CsDevCapacityPO(); csDevCapacity.setLineId(nDid.concat("0")); csDevCapacity.setCldid(item.getClDid()); - csDevCapacity.setCapacity(item.getCapacityA()); + csDevCapacity.setCapacity(Objects.isNull(item.getCapacityA())?0.0:item.getCapacityA()); list.add(csDevCapacity); }); devCapacityFeignClient.addList(list); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java index 28b6ea8..cb07553 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java @@ -1,17 +1,9 @@ package com.njcn.access.service; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.IService; import com.njcn.access.pojo.param.DeviceStatusParam; -import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryAddParm; -import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryAuditParm; -import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryQueryParm; -import com.njcn.csdevice.pojo.param.ProjectEquipmentQueryParm; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; -import com.njcn.csdevice.pojo.vo.DeviceManagerVO; -import com.njcn.csdevice.pojo.vo.ProjectEquipmentVO; import java.util.List; diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java index 76d37c7..1fd47ed 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java @@ -239,7 +239,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { failDto.setCode(400); redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L); ReqAndResDto.Req req = getPojo(mid,fileName,step); - publisher.send("/Pfm/DevFileCmd/V1" + id, new Gson().toJson(req), 1, false); + publisher.send("/Pfm/DevFileCmd/V1/" + id, new Gson().toJson(req), 1, false); } } } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java index 5abaa5a..9b5fa62 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java @@ -12,6 +12,9 @@ import lombok.Data; @Data public class WaveTimeDto { + /** + * 文件名 + */ private String fileName; private String deviceId; @@ -26,4 +29,9 @@ public class WaveTimeDto { private String location; + /** + * 等级 + */ + private String level; + } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java new file mode 100644 index 0000000..e787585 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/init/InitEventFiles.java @@ -0,0 +1,41 @@ +package com.njcn.zlevent.init; + +import cn.hutool.core.collection.CollectionUtil; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.zlevent.service.ICsWaveAnalysisService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import java.util.List; + + +/** + * @author xy + * + * 程序重启清除事件标识,开始消费事件 + */ +@Slf4j +@Component +@AllArgsConstructor +public class InitEventFiles implements CommandLineRunner { + + private final ICsWaveAnalysisService csWaveAnalysisService; + private final RedisUtil redisUtil; + private final EquipmentFeignClient equipmentFeignClient; + + @Override + public void run(String... args) { + List list = equipmentFeignClient.getAll().getData(); + if (CollectionUtil.isNotEmpty(list)) { + list.forEach(item->{ + redisUtil.delete("handleEvent:" + item.getNdid()); + //处理缓存数据 + csWaveAnalysisService.channelWave(item.getNdid()); + }); + } + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java index f85f94f..2759a95 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java @@ -18,8 +18,8 @@ import com.njcn.stat.enums.StatResponseEnum; import com.njcn.system.api.EpdFeignClient; import com.njcn.system.pojo.dto.EpdDTO; import com.njcn.zlevent.pojo.dto.FileStreamDto; -import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.service.ICsWaveAnalysisService; +import com.njcn.zlevent.utils.RemoveInfoUtils; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; import org.apache.commons.lang3.StringUtils; @@ -54,6 +54,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private ChannelObjectUtil channelObjectUtil; @Resource private ICsWaveAnalysisService iCsWaveAnalysisService; + @Resource + private RemoveInfoUtils removeInfoUtils; private static Integer mid = 1; private static Integer range = 51200; @@ -95,8 +97,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene IntStream.rangeClosed(start, end) .filter(i -> !dto.getList().contains(i)) .forEach(missingList::add); - Object object = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName)); - if (CollectionUtil.isNotEmpty(missingList) && Objects.isNull(object)) { + if (CollectionUtil.isNotEmpty(missingList)) { downloadFile(missingList,dto.getNDid(),fileName); } } @@ -113,10 +114,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene ReqAndResDto.Req reqAndResParam = getPojo(mid,name,i); publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false); //判断是否重发 - sendNextStep(name,nDid,mid,i,nDid); + sendNextStep(name,nDid,mid,i); FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + name + mid); //重发之后判断继续循环还是跳出循环 if (!Objects.isNull(fileRedisDto) && !Objects.equals(fileRedisDto.getCode(),200)) { + //判断重读还是丢弃 + removeInfoUtils.retryEventInfo(nDid,name); break; } mid = mid + 1; @@ -151,13 +154,13 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene /** * 根据装置响应来判断是否询问下一帧数据 */ - public void sendNextStep(String fileName, String id, int mid,int step, String nDid) { + public void sendNextStep(String fileName, String id, int mid,int step) { try { - for (int i = 1; i < 31; i++) { + for (int i = 1; i <= 10; i++) { if (step == 0 ){ - Thread.sleep(5000); + Thread.sleep(5000 * i); } else { - Thread.sleep(2000); + Thread.sleep(2000 * i); } FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + fileName + mid); if (Objects.isNull(fileRedisDto)) { @@ -166,30 +169,20 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L); } else { if (Objects.equals(fileRedisDto.getCode(),200)) { - redisUtil.delete("handleEvent:" + nDid); - redisUtil.delete(AppRedisKey.FILE_PART.concat(fileName)); - redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); - //删除已经处理完的文件,之后再判断还有是否需要下载的文件 - List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); - fileDto.removeIf(item2 -> item2.getFileName().equals(fileName)); - redisUtil.saveByKey("eventFile:" + nDid, fileDto); - if (CollectionUtil.isNotEmpty(fileDto)) { - iCsWaveAnalysisService.channelWave(nDid); - } break; } else { log.info("第" +i+"次尝试"); //尝试失败则设置code为400,如果装置响应了,则会将code置为200 FileRedisDto failDto = new FileRedisDto(); failDto.setCode(400); - redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L); + redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,2L*i); ReqAndResDto.Req req = getPojo(mid,fileName,step); - publisher.send("/Pfm/DevFileCmd/V1" + id, new Gson().toJson(req), 1, false); + publisher.send("/Pfm/DevFileCmd/V1/" + id, new Gson().toJson(req), 1, false); } } } } catch (Exception e) { - throw new BusinessException(AlgorithmResponseEnum.ASK_DEVICE_DIR_ERROR); + throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOAD_ERROR); } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java index 8b6f830..d073716 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java @@ -61,41 +61,46 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { @Override public void analysis(AppEventMessage appEventMessage) { - List list = new ArrayList<>(); - Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); - if (Objects.isNull(object1)){ - lineInfo(appEventMessage.getId()); - } - //获取装置id - String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId(); - //获取波形文件名称 - List dataArrayList = appEventMessage.getMsg().getDataArray(); - if (CollectionUtil.isNotEmpty(dataArrayList)){ - for (AppEventMessage.DataArray item : dataArrayList) { - List paramList = item.getParam(); - Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData(); - Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData(); - Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData(); - String lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); - String fileName = object.toString().replaceAll("\\[","").replaceAll("]",""); - List fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList()); - //获取到录波文件,将文件信息存储起来 - for (String file : fileList) { - file = file.trim(); - WaveTimeDto dto = channelTimeRange(file,appEventMessage.getId(),item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString()); - list.add(dto); - } - Object obj = redisUtil.getObjectByKey("eventFile:" + appEventMessage.getId()); - if (!Objects.isNull(obj)) { - List oldList = channelObjectUtil.objectToList(obj,WaveTimeDto.class); - oldList.addAll(list); - redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), oldList); - } else { - redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), list); - } + try { + List list = new ArrayList<>(); + Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); + if (Objects.isNull(object1)){ + lineInfo(appEventMessage.getId()); } - //处理缓存数据 - channelWave(appEventMessage.getId()); + //获取装置id + String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId(); + //获取波形文件名称 + List dataArrayList = appEventMessage.getMsg().getDataArray(); + if (CollectionUtil.isNotEmpty(dataArrayList)){ + for (AppEventMessage.DataArray item : dataArrayList) { + List paramList = item.getParam(); + Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData(); + Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData(); + Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData(); + String lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); + String fileName = object.toString().replaceAll("\\[","").replaceAll("]",""); + List fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList()); + //获取到录波文件,将文件信息存储起来 + for (String file : fileList) { + file = file.trim(); + WaveTimeDto dto = channelTimeRange(file,appEventMessage.getId(),item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString()); + list.add(dto); + } + Object obj = redisUtil.getObjectByKey("eventFile:" + appEventMessage.getId()); + if (!Objects.isNull(obj)) { + List oldList = channelObjectUtil.objectToList(obj,WaveTimeDto.class); + oldList.addAll(list); + redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), oldList); + } else { + redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), list); + } + } + //处理缓存数据 + channelWave(appEventMessage.getId()); + } + } catch (Exception e) { + //发生异常,清理标识 + redisUtil.delete("handleEvent:" + appEventMessage.getId()); } } @@ -105,8 +110,11 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { //有相同文件处理时,则不进行数据处理 Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid); if (Objects.isNull(obj)) { - WaveTimeDto dto = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class).get(0); - askFileInfo(nDid,1,dto.getFileName()); + List list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); + if (CollectionUtil.isNotEmpty(list)) { + WaveTimeDto dto = list.get(0); + askFileInfo(nDid,1,dto.getFileName()); + } } else { throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING); } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index 64d98b2..cc4bf04 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -29,6 +29,7 @@ import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.pojo.po.CsEventFileLogs; import com.njcn.zlevent.pojo.po.CsWave; import com.njcn.zlevent.service.*; +import com.njcn.zlevent.utils.RemoveInfoUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; @@ -62,6 +63,7 @@ public class FileServiceImpl implements IFileService { private final GeneralInfo generalInfo; private final ICsWaveAnalysisService iCsWaveAnalysisService; private final ChannelObjectUtil channelObjectUtil; + private final RemoveInfoUtils removeInfoUtils; @Override public void analysisFileInfo(AppFileMessage appFileMessage) { @@ -185,15 +187,8 @@ public class FileServiceImpl implements IFileService { if (CollectionUtil.isNotEmpty(eventList)){ eventList.forEach(wavePicFeignClient::getWavePics); } - redisUtil.delete("handleEvent:" + appFileMessage.getId()); - redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName())); - //删除已经处理完的文件,之后再判断还有是否需要下载的文件 - List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class); - fileDto.removeIf(item -> item.getFileName().equals(fileName)); - redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto); - if (CollectionUtil.isNotEmpty(fileDto)) { - iCsWaveAnalysisService.channelWave(appFileMessage.getId()); - } + //解析完删除、处理缓存 + removeInfoUtils.deleteEventInfo(appFileMessage.getId(),fileName); } else { //收到数据就刷新缓存值 redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 10L); @@ -238,23 +233,15 @@ public class FileServiceImpl implements IFileService { if (CollectionUtil.isNotEmpty(eventList)) { eventList.forEach(wavePicFeignClient::getWavePics); } - redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName())); redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); - redisUtil.delete(AppRedisKey.FILE_PART_MISSING.concat(appFileMessage.getMsg().getName())); - redisUtil.delete("handleEvent:" + appFileMessage.getId()); + //解析完删除、处理缓存 + removeInfoUtils.deleteEventInfo(appFileMessage.getId(),fileName); //删除临时文件 File file = new File(lsFileName); if (file.exists()) { file.delete(); } - //删除已经处理完的文件,之后再判断还有是否需要下载的文件 - List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class); - fileDto.removeIf(item -> item.getFileName().equals(fileName)); - redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto); - if (CollectionUtil.isNotEmpty(fileDto)) { - iCsWaveAnalysisService.channelWave(appFileMessage.getId()); - } } else { csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!"); @@ -265,10 +252,6 @@ public class FileServiceImpl implements IFileService { //将数据写入临时文件 appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); - Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName)); - if (!Objects.isNull(object2)) { - redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L); - } } } else { csEventLogs.setStatus(1); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java new file mode 100644 index 0000000..5b6a304 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java @@ -0,0 +1,90 @@ +package com.njcn.zlevent.utils; + +import cn.hutool.core.collection.CollectionUtil; +import com.njcn.access.utils.ChannelObjectUtil; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.zlevent.pojo.dto.WaveTimeDto; +import com.njcn.zlevent.service.ICsWaveAnalysisService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Objects; + +/** + * 类的介绍: + * @author xuyang + */ +@Slf4j +@Component +public class RemoveInfoUtils { + + @Resource + private RedisUtil redisUtil; + @Resource + private ChannelObjectUtil channelObjectUtil; + @Resource + private ICsWaveAnalysisService iCsWaveAnalysisService; + + public void deleteEventInfo(String nDid, String fileName) { + redisUtil.delete("handleEvent:" + nDid); + redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); + //删除已经处理完的文件,之后再判断还有是否需要下载的文件 + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); + fileDto.removeIf(item -> item.getFileName().equals(fileName)); + redisUtil.saveByKey("eventFile:" + nDid, fileDto); + if (CollectionUtil.isNotEmpty(fileDto)) { + iCsWaveAnalysisService.channelWave(nDid); + } + } + + public void retryEventInfo(String nDid, String fileName) { + // 尝试获取重试次数 + Object retryObject = redisUtil.getObjectByKey("retryEvent:" + nDid); + int retryTimes = retryObject != null ? Integer.parseInt(retryObject.toString()) : 0; + // 删除相关的 Redis 键 + deleteRelatedKeys(nDid, fileName); + // 处理重试逻辑 + if (retryTimes < 3) { + // 增加重试次数并保存 + redisUtil.saveByKey("retryEvent:" + nDid, retryTimes + 1); + // 重排文件列表 + updateFileList(nDid, fileName); + } else { + // 仅从列表中移除文件 + removeFileFromList(nDid, fileName); + // 检查是否还有其他文件需要处理 + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); + if (CollectionUtil.isNotEmpty(fileDto)) { + iCsWaveAnalysisService.channelWave(nDid); + } + } + } + + private void deleteRelatedKeys(String nDid, String fileName) { + redisUtil.delete("handleEvent:" + nDid); + redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); + } + + private void updateFileList(String nDid, String fileName) { + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); + WaveTimeDto dto = fileDto.stream() + .filter(item -> item.getFileName().equals(fileName)) + .findFirst() + .orElse(null); + if (dto != null) { + fileDto.removeIf(item -> item.getFileName().equals(fileName)); + fileDto.add(dto); + } + redisUtil.saveByKey("eventFile:" + nDid, fileDto); + } + + private void removeFileFromList(String nDid, String fileName) { + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); + fileDto.removeIf(item -> item.getFileName().equals(fileName)); + redisUtil.saveByKey("eventFile:" + nDid, fileDto); + } + +}