diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java index bd9be09..f905e0a 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java @@ -46,7 +46,7 @@ public class AskDeviceDataClientFallbackFactory implements FallbackFactory downloadFile(String nDid, String name, Integer size, String fileCheck) { log.error("{}异常,降级处理,异常为:{}","文件下载",cause.toString()); redisUtil.delete("fileDowning:" + nDid); - redisUtil.delete("fileCheck"+name); + redisUtil.delete("fileCheck"+nDid+name); throw new BusinessException(finalExceptionEnum); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java index 7900f65..6998b0a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java @@ -88,30 +88,29 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { public boolean downloadFile(String nDid, String name, Integer size, String fileCheck) { boolean result = false; try { - redisUtil.saveByKeyWithExpire("fileDowning:"+nDid,"fileDowning",600L); - redisUtil.saveByKeyWithExpire("fileCheck"+name,fileCheck,600L); + redisUtil.saveByKeyWithExpire("fileDowning:"+nDid,"fileDowning",120L); + redisUtil.saveByKeyWithExpire("fileCheck"+nDid+name,fileCheck,120L); Object object = getDeviceMid(nDid); if (!Objects.isNull(object)) { mid = (Integer) object; } ReqAndResDto.Req reqAndResParam = getAllPojo(mid,name); publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false); - //这里使用简单的轮询,但建议考虑更高效的机制 - for (int i = 0; i < 12; i++) { - Thread.sleep(10000); - Object object2 = redisUtil.getObjectByKey("downloadFilePath:"+name); - if (!Objects.isNull(object2)) { - result = true; - break; - } else { - Object object3 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(name)); - if (!Objects.isNull(object3)) { - FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object3), FileStreamDto.class); - String json = "{fileName:"+name+",allStep:"+dto.getTotal()+",nowStep:"+ (CollectionUtil.isEmpty(dto.getList())?0:dto.getList().size())+"}"; - publisher.send("/Web/Progress/" + nDid, new Gson().toJson(json), 1, false); - } - } - } +// //这里使用简单的轮询,但建议考虑更高效的机制 +// for (int i = 0; i < 60; i++) { +// Thread.sleep(5000); +// Object object3 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(name)); +// FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object3), FileStreamDto.class); +// if (dto.getList().size() == dto.getTotal()) { +// result = true; +// String json = "{fileName:"+name+",allStep:"+dto.getTotal()+",nowStep:"+ dto.getTotal() +"}"; +// publisher.send("/Web/Progress/" + nDid, new Gson().toJson(json), 1, false); +// break; +// } else { +// String json = "{fileName:"+name+",allStep:"+dto.getTotal()+",nowStep:"+ (CollectionUtil.isEmpty(dto.getList())?0:dto.getList().size())+"}"; +// publisher.send("/Web/Progress/" + nDid, new Gson().toJson(json), 1, false); +// } +// } mid = mid + 1; if (mid > 10000) { mid = 1; @@ -119,7 +118,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); } catch (Exception e) { redisUtil.delete("fileDowning:"+nDid); - redisUtil.delete("fileCheck"+name); + redisUtil.delete("fileCheck"+nDid+name); throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOAD_ERROR); } return result; diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java index 848883e..8d786a6 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java @@ -1,5 +1,6 @@ package com.njcn.zlevent.pojo.dto; +import io.swagger.annotations.ApiModelProperty; import lombok.Data; import java.io.Serializable; @@ -17,12 +18,16 @@ import java.util.Set; @Data public class FileStreamDto implements Serializable { + @ApiModelProperty("总帧") private Integer total; + @ApiModelProperty("nDid") private String nDid; + @ApiModelProperty("帧大小") private Integer frameLen; + @ApiModelProperty("帧集合") private Set list; } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java index aab0861..2be63fe 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java @@ -98,17 +98,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene downloadFile(missingList,dto.getNDid(),fileName); } } -// //项目重启或者接入,经过120s开始处理历史录波文件 -// else if (expiredKey.startsWith("startFile")) { -// List list = equipmentFeignClient.getAll().getData(); -// if (CollectionUtil.isNotEmpty(list)) { -// list.forEach(item->{ -// redisUtil.delete("handleEvent:" + item.getNdid()); -// //处理缓存数据 -// csWaveAnalysisService.channelWave(item.getNdid()); -// }); -// } -// } //手动文件下载 else if (expiredKey.startsWith(AppRedisKey.FILE_DOWN_TIME)) { List missingList = new ArrayList<>(); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index d2156ab..e5a86be 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -20,8 +20,10 @@ import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.stat.enums.StatResponseEnum; import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.api.DictTreeFeignClient; import com.njcn.system.api.EpdFeignClient; import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.enums.DicTreeEnum; import com.njcn.system.pojo.dto.EpdDTO; import com.njcn.system.pojo.po.DictData; import com.njcn.zlevent.pojo.constant.ZlConstant; @@ -69,6 +71,7 @@ public class EventServiceImpl implements IEventService { private final ICsEventLogsService csEventLogsService; private final SendEventUtils sendEventUtils; private final WlRecordFeignClient wlRecordFeignClient; + private final DictTreeFeignClient dictTreeFeignClient; @Override @Transactional(rollbackFor = Exception.class) @@ -88,6 +91,8 @@ public class EventServiceImpl implements IEventService { } //获取装置id CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData(); + //设备型号 + String code = dictTreeFeignClient.queryById(po.getDevModel()).getData().getCode(); try { //处理事件数据 List dataArray = appEventMessage.getMsg().getDataArray(); @@ -123,6 +128,7 @@ public class EventServiceImpl implements IEventService { Map tags = new HashMap<>(); tags.put(InfluxDBTableConstant.UUID,id); Map fields = new HashMap<>(); + //只有治理型号的设备有监测位置 if (Objects.equals(ZlConstant.EVENT_POSITION,param.getName())){ if (Objects.equals(param.getData(),ZlConstant.GRID)){ fields.put(param.getName(),"电网侧"); @@ -139,8 +145,8 @@ public class EventServiceImpl implements IEventService { csEvent.setPersistTime(Double.parseDouble(param.getData().toString())); } lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); - fields.put(param.getName(),appEventMessage.getMsg().getClDid()==1?"电网侧":"负载侧"); - csEvent.setLocation(appEventMessage.getMsg().getClDid()==1?ZlConstant.GRID:ZlConstant.LOAD); + fields.put(param.getName(),null); + csEvent.setLocation(null); csEvent.setClDid(appEventMessage.getMsg().getClDid()); fields.put(param.getName(),param.getData()); } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index a00d586..809b839 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -148,7 +148,7 @@ public class FileServiceImpl implements IFileService { Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class); if (Objects.isNull(fileInfoDto)) { - String fileCheck = redisUtil.getObjectByKey("fileCheck"+fileName).toString(); + String fileCheck = redisUtil.getObjectByKey("fileCheck"+appFileMessage.getId()+fileName).toString(); if (appFileMessage.getMsg().getFrameTotal() == 1) { //解析文件入库 filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileCheck,"download"); @@ -158,8 +158,15 @@ public class FileServiceImpl implements IFileService { csEventLogs.setAllStep(1); csEventLogs.setIsAll(1); redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); - redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); +// redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); + //存储文件信息 + fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal()); + fileStreamDto.setNDid(appFileMessage.getId()); + fileStreamDto.setFrameLen(appFileMessage.getMsg().getFrameLen()); + list.add(appFileMessage.getMsg().getFrameCurr()); + fileStreamDto.setList(list); + redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto); } else { //收到数据就刷新缓存值 redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName()), null, 60L); @@ -201,7 +208,7 @@ public class FileServiceImpl implements IFileService { file.delete(); } redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); - redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); +// redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); redisUtil.delete(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName())); } else { @@ -224,8 +231,11 @@ public class FileServiceImpl implements IFileService { } } } + //推送mqtt + String json = "{fileName:"+appFileMessage.getMsg().getName()+",allStep:"+appFileMessage.getMsg().getFrameTotal()+",nowStep:"+ appFileMessage.getMsg().getFrameCurr() +"}"; + publisher.send("/Web/Progress/" + appFileMessage.getId(), new Gson().toJson(json), 1, false); if (!Objects.isNull(filePath)){ - redisUtil.saveByKey("downloadFilePath:"+appFileMessage.getMsg().getName(),filePath); + redisUtil.saveByKey("downloadFilePath:" + appFileMessage.getId() + appFileMessage.getMsg().getName(),filePath); } } //录波文件下载