下载优化

This commit is contained in:
xy
2024-09-18 20:07:29 +08:00
parent 0fed84b8e0
commit 06c8ce2e29
5 changed files with 209 additions and 92 deletions

View File

@@ -4,12 +4,10 @@ import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.dto.file.FileRedisDto;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
@@ -49,14 +47,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
@Resource
private RedisUtil redisUtil;
@Resource
private CsTopicFeignClient csTopicFeignClient;
@Resource
private MqttPublisher publisher;
@Resource
private ChannelObjectUtil channelObjectUtil;
@Resource
private ICsWaveAnalysisService iCsWaveAnalysisService;
@Resource
private RemoveInfoUtils removeInfoUtils;
@Resource
private ICsWaveAnalysisService csWaveAnalysisService;
@@ -107,7 +99,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
downloadFile(missingList,dto.getNDid(),fileName);
}
}
//项目重启之后经过10s开始处理历史录波文件
//项目重启或者接入经过120s开始处理历史录波文件
else if (expiredKey.startsWith("startFile")) {
List<CsEquipmentDeliveryPO> list = equipmentFeignClient.getAll().getData();
if (CollectionUtil.isNotEmpty(list)) {
@@ -118,9 +110,49 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
});
}
}
//手动文件下载
else if (expiredKey.startsWith(AppRedisKey.FILE_DOWN_TIME)) {
List<Integer> missingList = new ArrayList<>();
String fileName = expiredKey.split(AppRedisKey.FILE_DOWN_TIME)[1];
Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class);
int start = 1;
int end = dto.getTotal();
IntStream.rangeClosed(start, end)
.filter(i -> !dto.getList().contains(i))
.forEach(missingList::add);
if (CollectionUtil.isNotEmpty(missingList)) {
webDownloadFile(missingList,dto.getNDid(),fileName);
}
}
}
//请求缺失的数据
//界面请求缺失的数据
public void webDownloadFile( List<Integer> missingList, String nDid, String name) {
for (Integer missingNumber : missingList) {
int i = missingNumber - 1;
Object object = getDeviceMid(nDid);
if (!Objects.isNull(object)) {
mid = (Integer) object;
}
ReqAndResDto.Req reqAndResParam = getPojo(mid,name,i);
publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false);
//判断是否重发
sendNextStep(name,nDid,mid,i);
FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + name + mid);
//重发之后判断继续循环还是跳出循环
if (!Objects.isNull(fileRedisDto) && !Objects.equals(fileRedisDto.getCode(),200)) {
break;
}
mid = mid + 1;
if (mid > 10000) {
mid = 1;
}
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid);
}
}
//录波文件请求缺失的数据
public void downloadFile( List<Integer> missingList, String nDid, String name) {
for (Integer missingNumber : missingList) {
int i = missingNumber - 1;

View File

@@ -74,34 +74,37 @@ public class FileServiceImpl implements IFileService {
String fileName = appFileMessage.getMsg().getFileInfo().getName();
//缓存文件信息用于文件流拼接
FileInfoDto fileInfoDto = new FileInfoDto();
WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class).get(0);
fileInfoDto.setStartTime(waveTimeDto.getStartTime());
fileInfoDto.setEndTime(waveTimeDto.getEndTime());
fileInfoDto.setDeviceId(waveTimeDto.getDeviceId());
fileInfoDto.setLineId(waveTimeDto.getLineId());
fileInfoDto.setName(appFileMessage.getMsg().getFileInfo().getName());
fileInfoDto.setFileTime(appFileMessage.getMsg().getFileInfo().getFileTime());
fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck());
fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType());
fileInfoDto.setLocation(waveTimeDto.getLocation());
//存储波形文件
CsWave csWave = new CsWave();
csWave.setNdid(appFileMessage.getId());
csWave.setCreateTime(LocalDateTime.now());
csWave.setStartTime(LocalDateTime.parse(waveTimeDto.getStartTime(), fmt));
csWave.setEndTime(LocalDateTime.parse(waveTimeDto.getEndTime(), fmt));
csWave.setRcdName(fileName);
csWave.setLocation(waveTimeDto.getLocation());
csWave.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
csWave.setCheckType(appFileMessage.getMsg().getFileInfo().getFileChkType());
csWave.setCheckNumber(appFileMessage.getMsg().getFileInfo().getFileCheck());
csWave.setStatus(0);
csWaveService.save(csWave);
//请求当前文件的数据
askFileStream(appFileMessage.getId(),mid,fileName,-1,range);
redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
redisUtil.delete(AppRedisKey.TIME+fileName);
List<WaveTimeDto> list = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class);
if (CollectionUtil.isNotEmpty(list)) {
WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class).get(0);
fileInfoDto.setStartTime(waveTimeDto.getStartTime());
fileInfoDto.setEndTime(waveTimeDto.getEndTime());
fileInfoDto.setDeviceId(waveTimeDto.getDeviceId());
fileInfoDto.setLineId(waveTimeDto.getLineId());
fileInfoDto.setName(appFileMessage.getMsg().getFileInfo().getName());
fileInfoDto.setFileTime(appFileMessage.getMsg().getFileInfo().getFileTime());
fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck());
fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType());
fileInfoDto.setLocation(waveTimeDto.getLocation());
//存储波形文件
CsWave csWave = new CsWave();
csWave.setNdid(appFileMessage.getId());
csWave.setCreateTime(LocalDateTime.now());
csWave.setStartTime(LocalDateTime.parse(waveTimeDto.getStartTime(), fmt));
csWave.setEndTime(LocalDateTime.parse(waveTimeDto.getEndTime(), fmt));
csWave.setRcdName(fileName);
csWave.setLocation(waveTimeDto.getLocation());
csWave.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
csWave.setCheckType(appFileMessage.getMsg().getFileInfo().getFileChkType());
csWave.setCheckNumber(appFileMessage.getMsg().getFileInfo().getFileCheck());
csWave.setStatus(0);
csWaveService.save(csWave);
//请求当前文件的数据
askFileStream(appFileMessage.getId(),mid,fileName,-1,range);
redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
redisUtil.delete(AppRedisKey.TIME+fileName);
}
} else {
throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR);
}
@@ -136,27 +139,77 @@ public class FileServiceImpl implements IFileService {
FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
if (Objects.isNull(fileInfoDto)) {
String fileCheck = redisUtil.getObjectByKey("fileCheck"+fileName).toString();
if (appFileMessage.getMsg().getFrameTotal() == 1){
if (appFileMessage.getMsg().getFrameTotal() == 1) {
//解析文件入库
filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileCheck,"download");
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
csEventLogs.setNowStep(1);
csEventLogs.setAllStep(1);
csEventLogs.setIsAll(1);
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
} else {
//最后一帧
if (Objects.equals(appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr())) {
Map<Integer, String> filePartMap = readFile(lsFileName);
filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
//解析文件
filePath = fileStream(appFileMessage.getMsg().getFrameTotal(), filePartMap, null, fileName, appFileMessage.getId(),fileCheck,"download");
//删除临时文件
File file = new File(lsFileName);
if (file.exists()) {
file.delete();
}
}
//中间帧
else {
Map<Integer, String> filePartMap = readFile(lsFileName);
if (Objects.isNull(filePartMap.get(appFileMessage.getMsg().getFrameCurr()))) {
appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData());
//收到数据就刷新缓存值
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName()), null, 10L);
Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
if (Objects.isNull(object1)){
fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal());
fileStreamDto.setNDid(appFileMessage.getId());
fileStreamDto.setFrameLen(appFileMessage.getMsg().getFrameLen());
list.add(appFileMessage.getMsg().getFrameCurr());
fileStreamDto.setList(list);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+appFileMessage.getMsg().getFrameTotal()+"帧,这是第"+appFileMessage.getMsg().getFrameCurr()+"帧,记录成功!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0);
redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto);
//将数据写入临时文件
appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData());
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
} else {
FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class);
//防止出现录入重复数据,做个判断
if (!dto.getList().contains(appFileMessage.getMsg().getFrameCurr())) {
dto.getList().add(appFileMessage.getMsg().getFrameCurr());
if (Objects.equals(dto.getTotal(), dto.getList().size())) {
Map<Integer, String> filePartMap = readFile(lsFileName);
filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
//解析文件入库
filePath = fileStream(dto.getTotal(), filePartMap, null, fileName, appFileMessage.getId(),fileCheck,"download");
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,全部收到,解析成功!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(1);
log.info("当前文件 {} 帧,这是第 {} 帧报文,全部收到,解析成功!", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
//删除临时文件
File file = new File(lsFileName);
if (file.exists()) {
file.delete();
}
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
} else {
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0);
redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), dto);
//将数据写入临时文件
appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
}
} else {
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件为重复帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,不做记录!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0);
}
}
}