数据补召优化

This commit is contained in:
xy
2024-11-04 20:45:58 +08:00
parent e2f46ebcde
commit 851404f62d
9 changed files with 279 additions and 67 deletions

View File

@@ -6,7 +6,7 @@ import com.njcn.mq.message.AppFileMessage;
import com.njcn.zlevent.api.fallback.FileClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
/**
* @author xy
@@ -19,4 +19,7 @@ public interface FileFeignClient {
@PostMapping("/fileStream")
HttpResult<String> fileStream(AppFileMessage appFileMessage);
@PostMapping("/downloadMakeUpFile")
HttpResult<String> downloadMakeUpFile(@RequestParam("nDid") String nDid);
}

View File

@@ -36,6 +36,12 @@ public class FileClientFallbackFactory implements FallbackFactory<FileFeignClien
log.error("{}异常,降级处理,异常为:{}","解析文件流",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<String> downloadMakeUpFile(String nDid) {
log.error("{}异常,降级处理,异常为:{}","下载补召文件",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -13,10 +13,7 @@ import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
/**
* 类的介绍:
@@ -54,4 +51,14 @@ public class FileController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/downloadMakeUpFile")
@ApiOperation("下载补召文件")
@ApiImplicitParam(name = "nDid", value = "nDid", required = true)
public HttpResult<String> downloadMakeUpFile(@RequestParam("nDid") String nDid){
String methodDescribe = getMethodDescribe("downloadMakeUpFile");
fileService.downloadMakeUpFile(nDid);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -1,26 +0,0 @@
//package com.njcn.zlevent.init;
//
//import com.njcn.redis.utils.RedisUtil;
//import lombok.AllArgsConstructor;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.stereotype.Component;
//
//
///**
// * @author xy
// *
// * 程序重启设置任务,消费历史录波文件
// */
//@Slf4j
//@Component
//@AllArgsConstructor
//public class InitEventFiles implements CommandLineRunner {
//
// private final RedisUtil redisUtil;
//
// @Override
// public void run(String... args) {
// redisUtil.saveByKeyWithExpire("startFile",null,120L);
// }
//}

View File

@@ -26,4 +26,8 @@ public interface IFileService {
*/
void analysisFileStream(AppFileMessage appFileMessage);
/**
* 下载补召文件
*/
void downloadMakeUpFile(String nDid);
}

View File

@@ -1,13 +1,10 @@
package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.access.utils.FileCommonUtils;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
@@ -26,7 +23,6 @@ import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveAnalysisService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@@ -54,6 +50,7 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
private final DicDataFeignClient dicDataFeignClient;
private final ChannelObjectUtil channelObjectUtil;
private final MqttUtil mqttUtil;
private final FileCommonUtils fileCommonUtils;
private static Integer mid = 1;
@Override
@@ -119,7 +116,7 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
}
WaveTimeDto dto = list.get(0);
redisUtil.saveByKeyWithExpire("fileMid:" + nDid,mid + "concat" +dto.getFileName(),60L);
askFileInfo(nDid,mid,dto.getFileName());
fileCommonUtils.askFileInfo(nDid,mid,dto.getFileName());
mid = mid + 1;
if (mid > 10000) {
mid = 1;
@@ -134,25 +131,6 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
}
}
/**
* 询问文件信息
*/
public void askFileInfo(String nDid, Integer mid, String fileName) {
String version = csTopicFeignClient.find(nDid).getData();
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_8.getCode()));
reqAndResParam.setExpire(-1);
String json = "{Name:\""+fileName+"\"}";
JSONObject jsonObject = JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject);
log.info("请求文件信息报文:" + new Gson().toJson(reqAndResParam));
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
}
/**
* 时间处理
*/

View File

@@ -11,12 +11,20 @@ import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.dto.file.FileDto;
import com.njcn.access.utils.CRC32Utils;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.access.utils.FileCommonUtils;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.config.GeneralInfo;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.DeviceFtpFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.api.PortableOffLogFeignClient;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csharmonic.api.WavePicFeignClient;
import com.njcn.csharmonic.enums.CsHarmonicResponseEnum;
import com.njcn.csharmonic.pojo.dto.DownloadMakeUpDto;
import com.njcn.mq.message.AppFileMessage;
import com.njcn.oss.constant.GeneralConstant;
import com.njcn.oss.constant.OssPath;
@@ -40,6 +48,7 @@ import net.sf.json.JSONObject;
import org.springframework.stereotype.Service;
import java.io.*;
import java.nio.file.Files;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@@ -69,6 +78,10 @@ public class FileServiceImpl implements IFileService {
private final RemoveInfoUtils removeInfoUtils;
private static Integer mid = 1;
private final EquipmentFeignClient equipmentFeignClient;
private final MqttUtil mqttUtil;
private final FileCommonUtils fileCommonUtils;
private final DeviceFtpFeignClient deviceFtpFeignClient;
private final PortableOffLogFeignClient portableOffLogFeignClient;
@Override
public void analysisFileInfo(AppFileMessage appFileMessage) {
@@ -163,8 +176,8 @@ public class FileServiceImpl implements IFileService {
csEventLogs.setNowStep(1);
csEventLogs.setAllStep(1);
csEventLogs.setIsAll(1);
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
//redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
//redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
//存储文件信息
fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal());
fileStreamDto.setNDid(appFileMessage.getId());
@@ -172,6 +185,16 @@ public class FileServiceImpl implements IFileService {
list.add(appFileMessage.getMsg().getFrameCurr());
fileStreamDto.setList(list);
redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto);
log.info("当前文件1帧,全部收到,解析成功!");
//针对补召文件
String key = AppRedisKey.MAKE_UP_FILES + appFileMessage.getId();
Object object = redisUtil.getObjectByKey(key);
//清空redis缓存
fileCommonUtils.cleanRedisData(appFileMessage.getId(),fileName);
if (Objects.nonNull(object)) {
DownloadMakeUpDto dto = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class);
channelMakeUpFile(dto,appFileMessage.getId(),fileName,filePath,lsFileName);
}
} else {
//收到数据就刷新缓存值
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName()), null, 60L);
@@ -212,9 +235,18 @@ public class FileServiceImpl implements IFileService {
if (file.exists()) {
file.delete();
}
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
redisUtil.delete(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName()));
//redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
//redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
//redisUtil.delete(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName()));
//针对补召文件
String key = AppRedisKey.MAKE_UP_FILES + appFileMessage.getId();
Object object = redisUtil.getObjectByKey(key);
//清空redis缓存
fileCommonUtils.cleanRedisData(appFileMessage.getId(),fileName);
if (Objects.nonNull(object)) {
DownloadMakeUpDto dto2 = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class);
channelMakeUpFile(dto2,appFileMessage.getId(),fileName,filePath,lsFileName);
}
} else {
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!");
@@ -239,7 +271,7 @@ public class FileServiceImpl implements IFileService {
String json = "{fileName:"+appFileMessage.getMsg().getName()+",allStep:"+appFileMessage.getMsg().getFrameTotal()+",nowStep:"+ appFileMessage.getMsg().getFrameCurr() +"}";
publisher.send("/Web/Progress/" + appFileMessage.getId(), new Gson().toJson(json), 1, false);
if (!Objects.isNull(filePath)){
redisUtil.saveByKey("downloadFilePath:" + appFileMessage.getId() + appFileMessage.getMsg().getName(),filePath);
redisUtil.saveByKeyWithExpire("downloadFilePath:" + appFileMessage.getId() + appFileMessage.getMsg().getName(),filePath,60L);
}
}
//录波文件下载
@@ -353,9 +385,6 @@ public class FileServiceImpl implements IFileService {
csEventLogs.setCompleteTime(LocalDateTime.now());
//记录日志
csEventLogsService.save(csEventLogs);
//清空缓存
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
//删除临时文件
String fileName = appFileMessage.getMsg().getName();
String lsFileName = generalInfo.getBusinessTempPath() + File.separator + fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1];
@@ -365,6 +394,96 @@ public class FileServiceImpl implements IFileService {
}
//继续消费
removeInfoUtils.deleteEventInfo(appFileMessage.getId(),appFileMessage.getMsg().getName());
//清空redis缓存
fileCommonUtils.cleanRedisData(appFileMessage.getId(),fileName);
}
}
@Override
public void downloadMakeUpFile(String nDid) {
try {
//判断客户端是否在线,在线再处理文件
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (mqttClient){
String key = AppRedisKey.MAKE_UP_FILES + nDid;
Object object = redisUtil.getObjectByKey(key);
if (Objects.nonNull(object)) {
DownloadMakeUpDto dto = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class);
if (CollectionUtil.isNotEmpty(dto.getFileList())){
Object object1 = channelObjectUtil.getDeviceMid(nDid);
if (!Objects.isNull(object1)) {
mid = (Integer) object1;
}
String file = dto.getFileList().get(0);
fileCommonUtils.askFileInfo(nDid,mid,file);
mid = mid + 1;
if (mid > 10000) {
mid = 1;
}
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid);
Thread.sleep(10000);
String infoKey = AppRedisKey.PROJECT_INFO + nDid;
FileDto.FileInfo info = channelObjectUtil.objectToSingleObject(redisUtil.getObjectByKey(infoKey), FileDto.FileInfo.class);
deviceFtpFeignClient.downloadFile(nDid,file,info.getFileSize(),info.getFileCheck()).getData();
}
}
} else {
throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE);
}
} catch (Exception e) {
String key = AppRedisKey.MAKE_UP_FILES + nDid;
redisUtil.delete(key);
Object object = redisUtil.getObjectByKey(AppRedisKey.MAKE_UP_FILES + nDid);
if (Objects.nonNull(object)) {
DownloadMakeUpDto dto = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class);
if (CollectionUtil.isNotEmpty(dto.getFileList())){
String file = dto.getFileList().get(0);
fileCommonUtils.cleanRedisData(nDid,file);
}
}
}
}
/**
* 处理补召文件
*/
public void channelMakeUpFile(DownloadMakeUpDto dto, String nDid, String fileName, String oldPath, String lsFileName){
try {
//如果是补召文件,则将文件复制到补召目录下
moveFile(oldPath,getFilePath(fileName,nDid),lsFileName);
//删除临时文件
File file = new File(lsFileName);
if (file.exists()) {
file.delete();
}
//删除下载文件
fileStorageUtil.deleteFile(oldPath);
List<String> list = dto.getFileList();
list.removeIf(item -> item.equals(fileName));
dto.setFileList(list);
redisUtil.saveByKey(AppRedisKey.MAKE_UP_FILES + nDid,dto);
//判断是否还有缓存的文件
if (CollectionUtil.isNotEmpty(list)){
//推送进度条
String json = "{allStep:" + dto.getAllStep() * 2 + ",nowStep:" + (dto.getAllStep() - list.size()) + "}";
publisher.send("/dataOnlineRecruitment/Progress/" + dto.getLineId(), new Gson().toJson(json), 1, false);
//下载文件
downloadMakeUpFile(nDid);
} else {
//清空缓存文件
String key = AppRedisKey.MAKE_UP_FILES + nDid;
redisUtil.delete(key);
//推送进度条
String json = "{allStep:" + dto.getAllStep() * 2 + ",nowStep:" + dto.getAllStep() + "}";
publisher.send("/dataOnlineRecruitment/Progress/" + dto.getLineId(), new Gson().toJson(json), 1, false);
//调用方法
portableOffLogFeignClient.dataOnlineRecruitment(dto.getDevId(),dto.getLineId(),dto.getEngineeringName());
}
} catch (Exception e) {
String key = AppRedisKey.MAKE_UP_FILES + nDid;
redisUtil.delete(key);
fileCommonUtils.cleanRedisData(nDid,fileName);
}
}
@@ -570,5 +689,47 @@ public class FileServiceImpl implements IFileService {
* -----------------------------------------------
*/
/**
* 迁移文件
*/
public void moveFile(String oldPath, String newPath, String lsPath) {
try {
InputStream inputStream = fileStorageUtil.getFileStream(oldPath);
FileOutputStream fileOutputStream = new FileOutputStream(lsPath);
// 创建一个缓冲区
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, bytesRead);
}
File src = new File(lsPath);
src.getParentFile().mkdirs();
InputStream is = Files.newInputStream(src.toPath());
fileStorageUtil.uploadStreamSpecifyName(is, OssPath.DEV_MAKE_UP_PATH,newPath);
inputStream.close();
fileOutputStream.close();
is.close();
} catch (Exception e) {
throw new BusinessException(CsHarmonicResponseEnum.MAKE_UP_ERROR);
}
}
/**
* 调整文件路径
*/
private String getFilePath(String path, String nDid) {
String[] parts = path.split("/");
StringBuilder sb = new StringBuilder();
boolean first = true;
for (int i = 3; i < parts.length; i++) {
if (!first) {
sb.append("/");
}
sb.append(parts[i]);
first = false;
}
return nDid + "/" + sb.toString();
}
}