录波文件冗余处理

This commit is contained in:
xy
2024-09-17 09:00:10 +08:00
parent ff188c3928
commit 2d5feb1ef2
5 changed files with 45 additions and 58 deletions

View File

@@ -60,39 +60,22 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
private static final Logger logger = LoggerFactory.getLogger(CsDeviceServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(CsDeviceServiceImpl.class);
private final EquipmentFeignClient equipmentFeignClient; private final EquipmentFeignClient equipmentFeignClient;
private final ICsEquipmentDeliveryService csEquipmentDeliveryService; private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
private final DictTreeFeignClient dictTreeFeignClient; private final DictTreeFeignClient dictTreeFeignClient;
private final ICsLedgerService csLedgerService; private final ICsLedgerService csLedgerService;
private final ICsDevModelRelationService csDevModelRelationService; private final ICsDevModelRelationService csDevModelRelationService;
private final ICsLineService csLineService; private final ICsLineService csLineService;
private final IAppLineTopologyDiagramService appLineTopologyDiagramService; private final IAppLineTopologyDiagramService appLineTopologyDiagramService;
private final ICsDeviceUserService csDeviceUserService; private final ICsDeviceUserService csDeviceUserService;
private final MqttPublisher publisher; private final MqttPublisher publisher;
private final RedisUtil redisUtil; private final RedisUtil redisUtil;
private final MqttUtil mqttUtil; private final MqttUtil mqttUtil;
private final ICsTopicService csTopicService; private final ICsTopicService csTopicService;
private final DicDataFeignClient dicDataFeignClient; private final DicDataFeignClient dicDataFeignClient;
private final CsLogsFeignClient csLogsFeignClient; private final CsLogsFeignClient csLogsFeignClient;
private final ProcessFeignClient processFeignClient; private final ProcessFeignClient processFeignClient;
private final CsLinePOService csLinePOService; private final CsLinePOService csLinePOService;
private final CsDeviceUserPOService csDeviceUserPOService; private final CsDeviceUserPOService csDeviceUserPOService;
private final ICsDataSetService csDataSetService; private final ICsDataSetService csDataSetService;
@Override @Override

View File

@@ -1,41 +1,26 @@
package com.njcn.zlevent.init; package com.njcn.zlevent.init;
import cn.hutool.core.collection.CollectionUtil;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.zlevent.service.ICsWaveAnalysisService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List;
/** /**
* @author xy * @author xy
* *
* 程序重启清除事件标识,开始消费事 * 程序重启设置任务,消费历史录波文
*/ */
@Slf4j @Slf4j
@Component @Component
@AllArgsConstructor @AllArgsConstructor
public class InitEventFiles implements CommandLineRunner { public class InitEventFiles implements CommandLineRunner {
private final ICsWaveAnalysisService csWaveAnalysisService;
private final RedisUtil redisUtil; private final RedisUtil redisUtil;
private final EquipmentFeignClient equipmentFeignClient;
@Override @Override
public void run(String... args) { public void run(String... args) {
List<CsEquipmentDeliveryPO> list = equipmentFeignClient.getAll().getData(); redisUtil.saveByKeyWithExpire("startFile",null,30L);
if (CollectionUtil.isNotEmpty(list)) {
list.forEach(item->{
redisUtil.delete("handleEvent:" + item.getNdid());
//处理缓存数据
csWaveAnalysisService.channelWave(item.getNdid());
});
}
} }
} }

View File

@@ -11,7 +11,9 @@ import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.dto.file.FileRedisDto;
import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum; import com.njcn.stat.enums.StatResponseEnum;
@@ -56,6 +58,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private ICsWaveAnalysisService iCsWaveAnalysisService; private ICsWaveAnalysisService iCsWaveAnalysisService;
@Resource @Resource
private RemoveInfoUtils removeInfoUtils; private RemoveInfoUtils removeInfoUtils;
@Resource
private ICsWaveAnalysisService csWaveAnalysisService;
@Resource
private EquipmentFeignClient equipmentFeignClient;
private static Integer mid = 1; private static Integer mid = 1;
private static Integer range = 51200; private static Integer range = 51200;
@@ -101,6 +107,17 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
downloadFile(missingList,dto.getNDid(),fileName); downloadFile(missingList,dto.getNDid(),fileName);
} }
} }
//项目重启之后经过10s开始处理历史录波文件
else if (expiredKey.startsWith("startFile")) {
List<CsEquipmentDeliveryPO> list = equipmentFeignClient.getAll().getData();
if (CollectionUtil.isNotEmpty(list)) {
list.forEach(item->{
redisUtil.delete("handleEvent:" + item.getNdid());
//处理缓存数据
csWaveAnalysisService.channelWave(item.getNdid());
});
}
}
} }
//请求缺失的数据 //请求缺失的数据
@@ -156,7 +173,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
*/ */
public void sendNextStep(String fileName, String id, int mid,int step) { public void sendNextStep(String fileName, String id, int mid,int step) {
try { try {
for (int i = 1; i <= 10; i++) { for (int i = 1; i <= 11; i++) {
if (step == 0 ){ if (step == 0 ){
Thread.sleep(5000 * i); Thread.sleep(5000 * i);
} else { } else {
@@ -175,7 +192,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
//尝试失败则设置code为400如果装置响应了则会将code置为200 //尝试失败则设置code为400如果装置响应了则会将code置为200
FileRedisDto failDto = new FileRedisDto(); FileRedisDto failDto = new FileRedisDto();
failDto.setCode(400); failDto.setCode(400);
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,2L*i); redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,4L*i);
ReqAndResDto.Req req = getPojo(mid,fileName,step); ReqAndResDto.Req req = getPojo(mid,fileName,step);
publisher.send("/Pfm/DevFileCmd/V1/" + id, new Gson().toJson(req), 1, false); publisher.send("/Pfm/DevFileCmd/V1/" + id, new Gson().toJson(req), 1, false);
} }

View File

@@ -8,6 +8,7 @@ import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum; import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient;
@@ -46,18 +47,13 @@ import java.util.stream.Collectors;
public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
private final EquipmentFeignClient equipmentFeignClient; private final EquipmentFeignClient equipmentFeignClient;
private final MqttPublisher publisher; private final MqttPublisher publisher;
private final CsTopicFeignClient csTopicFeignClient; private final CsTopicFeignClient csTopicFeignClient;
private final RedisUtil redisUtil; private final RedisUtil redisUtil;
private final CsLineFeignClient csLineFeignClient; private final CsLineFeignClient csLineFeignClient;
private final DicDataFeignClient dicDataFeignClient; private final DicDataFeignClient dicDataFeignClient;
private final ChannelObjectUtil channelObjectUtil; private final ChannelObjectUtil channelObjectUtil;
private final MqttUtil mqttUtil;
@Override @Override
public void analysis(AppEventMessage appEventMessage) { public void analysis(AppEventMessage appEventMessage) {
@@ -107,16 +103,21 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
@Override @Override
@Async("asyncExecutor") @Async("asyncExecutor")
public void channelWave(String nDid) { public void channelWave(String nDid) {
//有相同文件处理时,则不进行数据处理 //判断客户端是否在线,在线再处理文件
Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid); String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
if (Objects.isNull(obj)) { boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
List<WaveTimeDto> list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); if (mqttClient){
if (CollectionUtil.isNotEmpty(list)) { //有相同文件处理时,则不进行数据处理
WaveTimeDto dto = list.get(0); Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid);
askFileInfo(nDid,1,dto.getFileName()); if (Objects.isNull(obj)) {
List<WaveTimeDto> list = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class);
if (CollectionUtil.isNotEmpty(list)) {
WaveTimeDto dto = list.get(0);
askFileInfo(nDid,1,dto.getFileName());
}
} else {
throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING);
} }
} else {
throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING);
} }
} }

View File

@@ -50,15 +50,15 @@ public class RemoveInfoUtils {
// 增加重试次数并保存 // 增加重试次数并保存
redisUtil.saveByKey(("retryEvent:" + nDid + fileName), retryTimes + 1); redisUtil.saveByKey(("retryEvent:" + nDid + fileName), retryTimes + 1);
// 重排文件列表 // 重排文件列表
updateFileList(nDid, fileName); //updateFileList(nDid, fileName);
} else { } else {
// 从列表中移除文件 // 从列表中移除文件
removeFileFromList(nDid, fileName); removeFileFromList(nDid, fileName);
// 检查是否还有其他文件需要处理 }
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); // 检查是否还有其他文件需要处理
if (CollectionUtil.isNotEmpty(fileDto)) { List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class);
iCsWaveAnalysisService.channelWave(nDid); if (CollectionUtil.isNotEmpty(fileDto)) {
} iCsWaveAnalysisService.channelWave(nDid);
} }
} }
@@ -84,6 +84,7 @@ public class RemoveInfoUtils {
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class);
fileDto.removeIf(item -> item.getFileName().equals(fileName)); fileDto.removeIf(item -> item.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + nDid, fileDto); redisUtil.saveByKey("eventFile:" + nDid, fileDto);
redisUtil.delete(AppRedisKey.FILE_PART.concat(fileName));
} }
} }