录波文件下载优化

This commit is contained in:
xy
2024-09-14 11:44:38 +08:00
parent dc7afcc240
commit 848cc9c7de
9 changed files with 205 additions and 90 deletions

View File

@@ -426,7 +426,7 @@ public class MqttMessageHandler {
CsDevCapacityPO csDevCapacity = new CsDevCapacityPO();
csDevCapacity.setLineId(nDid.concat("0"));
csDevCapacity.setCldid(item.getClDid());
csDevCapacity.setCapacity(item.getCapacityA());
csDevCapacity.setCapacity(Objects.isNull(item.getCapacityA())?0.0:item.getCapacityA());
list.add(csDevCapacity);
});
devCapacityFeignClient.addList(list);

View File

@@ -1,17 +1,9 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.access.pojo.param.DeviceStatusParam;
import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryAddParm;
import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryAuditParm;
import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryQueryParm;
import com.njcn.csdevice.pojo.param.ProjectEquipmentQueryParm;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.csdevice.pojo.vo.DeviceManagerVO;
import com.njcn.csdevice.pojo.vo.ProjectEquipmentVO;
import java.util.List;

View File

@@ -239,7 +239,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
failDto.setCode(400);
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L);
ReqAndResDto.Req req = getPojo(mid,fileName,step);
publisher.send("/Pfm/DevFileCmd/V1" + id, new Gson().toJson(req), 1, false);
publisher.send("/Pfm/DevFileCmd/V1/" + id, new Gson().toJson(req), 1, false);
}
}
}

View File

@@ -12,6 +12,9 @@ import lombok.Data;
@Data
public class WaveTimeDto {
/**
* 文件名
*/
private String fileName;
private String deviceId;
@@ -26,4 +29,9 @@ public class WaveTimeDto {
private String location;
/**
* 等级
*/
private String level;
}

View File

@@ -0,0 +1,41 @@
package com.njcn.zlevent.init;
import cn.hutool.core.collection.CollectionUtil;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.zlevent.service.ICsWaveAnalysisService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author xy
*
* 程序重启清除事件标识,开始消费事件
*/
@Slf4j
@Component
@AllArgsConstructor
public class InitEventFiles implements CommandLineRunner {
private final ICsWaveAnalysisService csWaveAnalysisService;
private final RedisUtil redisUtil;
private final EquipmentFeignClient equipmentFeignClient;
@Override
public void run(String... args) {
List<CsEquipmentDeliveryPO> list = equipmentFeignClient.getAll().getData();
if (CollectionUtil.isNotEmpty(list)) {
list.forEach(item->{
redisUtil.delete("handleEvent:" + item.getNdid());
//处理缓存数据
csWaveAnalysisService.channelWave(item.getNdid());
});
}
}
}

View File

@@ -18,8 +18,8 @@ import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveAnalysisService;
import com.njcn.zlevent.utils.RemoveInfoUtils;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.commons.lang3.StringUtils;
@@ -54,6 +54,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private ChannelObjectUtil channelObjectUtil;
@Resource
private ICsWaveAnalysisService iCsWaveAnalysisService;
@Resource
private RemoveInfoUtils removeInfoUtils;
private static Integer mid = 1;
private static Integer range = 51200;
@@ -95,8 +97,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
IntStream.rangeClosed(start, end)
.filter(i -> !dto.getList().contains(i))
.forEach(missingList::add);
Object object = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName));
if (CollectionUtil.isNotEmpty(missingList) && Objects.isNull(object)) {
if (CollectionUtil.isNotEmpty(missingList)) {
downloadFile(missingList,dto.getNDid(),fileName);
}
}
@@ -113,10 +114,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
ReqAndResDto.Req reqAndResParam = getPojo(mid,name,i);
publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false);
//判断是否重发
sendNextStep(name,nDid,mid,i,nDid);
sendNextStep(name,nDid,mid,i);
FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + name + mid);
//重发之后判断继续循环还是跳出循环
if (!Objects.isNull(fileRedisDto) && !Objects.equals(fileRedisDto.getCode(),200)) {
//判断重读还是丢弃
removeInfoUtils.retryEventInfo(nDid,name);
break;
}
mid = mid + 1;
@@ -151,13 +154,13 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
/**
* 根据装置响应来判断是否询问下一帧数据
*/
public void sendNextStep(String fileName, String id, int mid,int step, String nDid) {
public void sendNextStep(String fileName, String id, int mid,int step) {
try {
for (int i = 1; i < 31; i++) {
for (int i = 1; i <= 10; i++) {
if (step == 0 ){
Thread.sleep(5000);
Thread.sleep(5000 * i);
} else {
Thread.sleep(2000);
Thread.sleep(2000 * i);
}
FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + fileName + mid);
if (Objects.isNull(fileRedisDto)) {
@@ -166,30 +169,20 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L);
} else {
if (Objects.equals(fileRedisDto.getCode(),200)) {
redisUtil.delete("handleEvent:" + nDid);
redisUtil.delete(AppRedisKey.FILE_PART.concat(fileName));
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
//删除已经处理完的文件,之后再判断还有是否需要下载的文件
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class);
fileDto.removeIf(item2 -> item2.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + nDid, fileDto);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(nDid);
}
break;
} else {
log.info("" +i+"次尝试");
//尝试失败则设置code为400如果装置响应了则会将code置为200
FileRedisDto failDto = new FileRedisDto();
failDto.setCode(400);
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L);
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,2L*i);
ReqAndResDto.Req req = getPojo(mid,fileName,step);
publisher.send("/Pfm/DevFileCmd/V1" + id, new Gson().toJson(req), 1, false);
publisher.send("/Pfm/DevFileCmd/V1/" + id, new Gson().toJson(req), 1, false);
}
}
}
} catch (Exception e) {
throw new BusinessException(AlgorithmResponseEnum.ASK_DEVICE_DIR_ERROR);
throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOAD_ERROR);
}
}

View File

@@ -61,41 +61,46 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
@Override
public void analysis(AppEventMessage appEventMessage) {
List<WaveTimeDto> list = new ArrayList<>();
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
//获取波形文件名称
List<AppEventMessage.DataArray> dataArrayList = appEventMessage.getMsg().getDataArray();
if (CollectionUtil.isNotEmpty(dataArrayList)){
for (AppEventMessage.DataArray item : dataArrayList) {
List<AppEventMessage.Param> paramList = item.getParam();
Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData();
Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData();
Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData();
String lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid();
String fileName = object.toString().replaceAll("\\[","").replaceAll("]","");
List<String> fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList());
//获取到录波文件,将文件信息存储起来
for (String file : fileList) {
file = file.trim();
WaveTimeDto dto = channelTimeRange(file,appEventMessage.getId(),item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString());
list.add(dto);
}
Object obj = redisUtil.getObjectByKey("eventFile:" + appEventMessage.getId());
if (!Objects.isNull(obj)) {
List<WaveTimeDto> oldList = channelObjectUtil.objectToList(obj,WaveTimeDto.class);
oldList.addAll(list);
redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), oldList);
} else {
redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), list);
}
try {
List<WaveTimeDto> list = new ArrayList<>();
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
//处理缓存数据
channelWave(appEventMessage.getId());
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
//获取波形文件名称
List<AppEventMessage.DataArray> dataArrayList = appEventMessage.getMsg().getDataArray();
if (CollectionUtil.isNotEmpty(dataArrayList)){
for (AppEventMessage.DataArray item : dataArrayList) {
List<AppEventMessage.Param> paramList = item.getParam();
Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData();
Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData();
Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData();
String lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid();
String fileName = object.toString().replaceAll("\\[","").replaceAll("]","");
List<String> fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList());
//获取到录波文件,将文件信息存储起来
for (String file : fileList) {
file = file.trim();
WaveTimeDto dto = channelTimeRange(file,appEventMessage.getId(),item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString());
list.add(dto);
}
Object obj = redisUtil.getObjectByKey("eventFile:" + appEventMessage.getId());
if (!Objects.isNull(obj)) {
List<WaveTimeDto> oldList = channelObjectUtil.objectToList(obj,WaveTimeDto.class);
oldList.addAll(list);
redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), oldList);
} else {
redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), list);
}
}
//处理缓存数据
channelWave(appEventMessage.getId());
}
} catch (Exception e) {
//发生异常,清理标识
redisUtil.delete("handleEvent:" + appEventMessage.getId());
}
}
@@ -105,8 +110,11 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
//有相同文件处理时,则不进行数据处理
Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid);
if (Objects.isNull(obj)) {
WaveTimeDto dto = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class).get(0);
askFileInfo(nDid,1,dto.getFileName());
List<WaveTimeDto> list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class);
if (CollectionUtil.isNotEmpty(list)) {
WaveTimeDto dto = list.get(0);
askFileInfo(nDid,1,dto.getFileName());
}
} else {
throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING);
}

View File

@@ -29,6 +29,7 @@ import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.pojo.po.CsEventFileLogs;
import com.njcn.zlevent.pojo.po.CsWave;
import com.njcn.zlevent.service.*;
import com.njcn.zlevent.utils.RemoveInfoUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
@@ -62,6 +63,7 @@ public class FileServiceImpl implements IFileService {
private final GeneralInfo generalInfo;
private final ICsWaveAnalysisService iCsWaveAnalysisService;
private final ChannelObjectUtil channelObjectUtil;
private final RemoveInfoUtils removeInfoUtils;
@Override
public void analysisFileInfo(AppFileMessage appFileMessage) {
@@ -185,15 +187,8 @@ public class FileServiceImpl implements IFileService {
if (CollectionUtil.isNotEmpty(eventList)){
eventList.forEach(wavePicFeignClient::getWavePics);
}
redisUtil.delete("handleEvent:" + appFileMessage.getId());
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
//删除已经处理完的文件,之后再判断还有是否需要下载的文件
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class);
fileDto.removeIf(item -> item.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(appFileMessage.getId());
}
//解析完删除、处理缓存
removeInfoUtils.deleteEventInfo(appFileMessage.getId(),fileName);
} else {
//收到数据就刷新缓存值
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 10L);
@@ -238,23 +233,15 @@ public class FileServiceImpl implements IFileService {
if (CollectionUtil.isNotEmpty(eventList)) {
eventList.forEach(wavePicFeignClient::getWavePics);
}
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART_MISSING.concat(appFileMessage.getMsg().getName()));
redisUtil.delete("handleEvent:" + appFileMessage.getId());
//解析完删除、处理缓存
removeInfoUtils.deleteEventInfo(appFileMessage.getId(),fileName);
//删除临时文件
File file = new File(lsFileName);
if (file.exists()) {
file.delete();
}
//删除已经处理完的文件,之后再判断还有是否需要下载的文件
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class);
fileDto.removeIf(item -> item.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(appFileMessage.getId());
}
} else {
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!");
@@ -265,10 +252,6 @@ public class FileServiceImpl implements IFileService {
//将数据写入临时文件
appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName));
if (!Objects.isNull(object2)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L);
}
}
} else {
csEventLogs.setStatus(1);

View File

@@ -0,0 +1,90 @@
package com.njcn.zlevent.utils;
import cn.hutool.core.collection.CollectionUtil;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveAnalysisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 类的介绍:
* @author xuyang
*/
@Slf4j
@Component
public class RemoveInfoUtils {
@Resource
private RedisUtil redisUtil;
@Resource
private ChannelObjectUtil channelObjectUtil;
@Resource
private ICsWaveAnalysisService iCsWaveAnalysisService;
public void deleteEventInfo(String nDid, String fileName) {
redisUtil.delete("handleEvent:" + nDid);
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
//删除已经处理完的文件,之后再判断还有是否需要下载的文件
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class);
fileDto.removeIf(item -> item.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + nDid, fileDto);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(nDid);
}
}
public void retryEventInfo(String nDid, String fileName) {
// 尝试获取重试次数
Object retryObject = redisUtil.getObjectByKey("retryEvent:" + nDid);
int retryTimes = retryObject != null ? Integer.parseInt(retryObject.toString()) : 0;
// 删除相关的 Redis 键
deleteRelatedKeys(nDid, fileName);
// 处理重试逻辑
if (retryTimes < 3) {
// 增加重试次数并保存
redisUtil.saveByKey("retryEvent:" + nDid, retryTimes + 1);
// 重排文件列表
updateFileList(nDid, fileName);
} else {
// 仅从列表中移除文件
removeFileFromList(nDid, fileName);
// 检查是否还有其他文件需要处理
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(nDid);
}
}
}
private void deleteRelatedKeys(String nDid, String fileName) {
redisUtil.delete("handleEvent:" + nDid);
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
}
private void updateFileList(String nDid, String fileName) {
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class);
WaveTimeDto dto = fileDto.stream()
.filter(item -> item.getFileName().equals(fileName))
.findFirst()
.orElse(null);
if (dto != null) {
fileDto.removeIf(item -> item.getFileName().equals(fileName));
fileDto.add(dto);
}
redisUtil.saveByKey("eventFile:" + nDid, fileDto);
}
private void removeFileFromList(String nDid, String fileName) {
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class);
fileDto.removeIf(item -> item.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + nDid, fileDto);
}
}