diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 1dd60e4..437d40a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -494,7 +494,7 @@ public class MqttMessageHandler { switch (res.getType()){ case 4865: //设置心跳时间,超时改为掉线 - redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L); + redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),300L); //有心跳,则将装置改成在线 //csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode()); //处理心跳 diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index c100461..057be14 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -77,7 +77,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private UserFeignClient userFeignClient; @Resource private RedisUtil redisUtil; - private final Object lock = new Object(); public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { @@ -151,6 +150,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene synchronized (lock) { NoticeUserDto dto = sendOffLine(nDid); sendEventToUser(dto); + addLogs(dto); String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { System.out.println(nDid + "执行重连定时任务..."); @@ -172,6 +172,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene if (MAX_WARNING_TIMES == 30) { NoticeUserDto dto2 = sendConnectFail(nDid); sendEventToUser(dto2); + addLogs(dto2); } } } else { @@ -180,6 +181,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene if (MAX_WARNING_TIMES == 30) { NoticeUserDto dto2 = sendConnectFail(nDid); sendEventToUser(dto2); + addLogs(dto2); } logDto.setResult(0); } @@ -191,7 +193,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene //掉线通知 private NoticeUserDto sendOffLine(String nDid) { NoticeUserDto dto = new NoticeUserDto(); - NoticeUserDto.Payload payload = new NoticeUserDto.Payload(); +// NoticeUserDto.Payload payload = new NoticeUserDto.Payload(); dto.setTitle("设备离线"); CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData(); DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData(); @@ -201,16 +203,16 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + dateStr + "离线"); dto.setContent(content); dto.setPushClientId(getEventUser(po.getId(),true)); - payload.setType(3); - payload.setPath("/pages/message/message?type="+payload.getType()); - dto.setPayload(payload); +// payload.setType(3); +// payload.setPath("/pages/message/message?type="+payload.getType()); +// dto.setPayload(payload); return dto; } //重连失败通知 private NoticeUserDto sendConnectFail(String nDid) { NoticeUserDto dto = new NoticeUserDto(); - NoticeUserDto.Payload payload = new NoticeUserDto.Payload(); +// NoticeUserDto.Payload payload = new NoticeUserDto.Payload(); dto.setTitle("设备接入失败"); CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData(); DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData(); @@ -219,14 +221,20 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene String dateStr = localDateTime.format(fmt); String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + dateStr + "多次接入失败"); dto.setContent(content); - dto.setPushClientId(getEventUser(po.getId(),true)); - payload.setType(3); - payload.setPath("/pages/message/message?type="+payload.getType()); - dto.setPayload(payload); - + dto.setPushClientId(getEventUser(po.getId(),false)); +// payload.setType(3); +// payload.setPath("/pages/message/message?type="+payload.getType()); +// dto.setPayload(payload); return dto; } + //日志记录 + private void addLogs(NoticeUserDto noticeUserDto) { + DeviceLogDTO dto = new DeviceLogDTO(); + dto.setUserName(noticeUserDto.getTitle()); + dto.setOperate(noticeUserDto.getContent()); + csLogsFeignClient.addUserLog(dto); + } /** * 获取所有需要推送的用户id diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java index 1fd47ed..78481f4 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java @@ -9,10 +9,12 @@ import com.njcn.access.pojo.dto.ControlDto; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.service.AskDeviceDataService; +import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; +import com.njcn.zlevent.pojo.dto.NoticeUserDto; import lombok.RequiredArgsConstructor; import net.sf.json.JSONObject; import org.slf4j.Logger; @@ -20,6 +22,11 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author xy @@ -80,35 +87,33 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); } - @Override public boolean downloadFile(String nDid, String name, Integer size, String fileCheck) { boolean result = true; - redisUtil.saveByKeyWithExpire("fileDowning:"+nDid,"fileDowning",300L); - redisUtil.saveByKeyWithExpire("fileCheck"+name,fileCheck,300L); - int length = size/51200 + 1; try { - for (int i = 0; i < length; i++) { - Object object = getDeviceMid(nDid); - if (!Objects.isNull(object)) { - mid = (Integer) object; - } - ReqAndResDto.Req reqAndResParam = getPojo(mid,name,i); - publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false); - //判断是否重发 - sendNextStep(name,nDid,mid,i); - FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + name + mid); - //重发之后判断继续循环还是跳出循环 - if (!Objects.isNull(fileRedisDto) && !Objects.equals(fileRedisDto.getCode(),200)) { - result = false; - break; - } - mid = mid + 1; - if (mid > 10000) { - mid = 1; - } - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); + redisUtil.saveByKeyWithExpire("fileDowning:"+nDid,"fileDowning",300L); + redisUtil.saveByKeyWithExpire("fileCheck"+name,fileCheck,300L); + Object object = getDeviceMid(nDid); + if (!Objects.isNull(object)) { + mid = (Integer) object; } + ReqAndResDto.Req reqAndResParam = getAllPojo(mid,name); + publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false); + //这里使用简单的轮询,但建议考虑更高效的机制 + for (int i = 0; i < 120; i++) { + Thread.sleep(2000); + Object object2 = redisUtil.getObjectByKey("downloadFilePath:"+name); + if (!Objects.isNull(object2)) { + break; + } else { + result = false; + } + } + mid = mid + 1; + if (mid > 10000) { + mid = 1; + } + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); } catch (Exception e) { redisUtil.delete("fileDowning:"+nDid); redisUtil.delete("fileCheck"+name); @@ -196,6 +201,25 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { return csTopicFeignClient.find(nDid).getData(); } + + /** + * 全文件下载请求报文 + */ + public ReqAndResDto.Req getAllPojo(Integer mid, String fileName) { + String json; + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(mid); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode())); + reqAndResParam.setExpire(-1); + json = "{Name:\""+fileName+"\",TransferMode:"+(-1)+",Offset:"+(-1)+",Len:"+range+"}"; + JSONObject jsonObject = JSONObject.fromObject(json); + reqAndResParam.setMsg(jsonObject); + return reqAndResParam; + } + + /** * 文件下载请求报文 */ diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java index e1c5c23..b38905b 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java @@ -4,12 +4,10 @@ import cn.hutool.core.collection.CollectionUtil; import com.alibaba.fastjson.JSON; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.publisher.MqttPublisher; -import com.njcn.access.api.CsTopicFeignClient; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.file.FileRedisDto; -import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.enums.AlgorithmResponseEnum; @@ -49,14 +47,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene @Resource private RedisUtil redisUtil; @Resource - private CsTopicFeignClient csTopicFeignClient; - @Resource private MqttPublisher publisher; @Resource - private ChannelObjectUtil channelObjectUtil; - @Resource - private ICsWaveAnalysisService iCsWaveAnalysisService; - @Resource private RemoveInfoUtils removeInfoUtils; @Resource private ICsWaveAnalysisService csWaveAnalysisService; @@ -107,7 +99,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene downloadFile(missingList,dto.getNDid(),fileName); } } - //项目重启之后,经过10s开始处理历史录波文件 + //项目重启或者接入,经过120s开始处理历史录波文件 else if (expiredKey.startsWith("startFile")) { List list = equipmentFeignClient.getAll().getData(); if (CollectionUtil.isNotEmpty(list)) { @@ -118,9 +110,49 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene }); } } + //手动文件下载 + else if (expiredKey.startsWith(AppRedisKey.FILE_DOWN_TIME)) { + List missingList = new ArrayList<>(); + String fileName = expiredKey.split(AppRedisKey.FILE_DOWN_TIME)[1]; + Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName)); + FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class); + int start = 1; + int end = dto.getTotal(); + IntStream.rangeClosed(start, end) + .filter(i -> !dto.getList().contains(i)) + .forEach(missingList::add); + if (CollectionUtil.isNotEmpty(missingList)) { + webDownloadFile(missingList,dto.getNDid(),fileName); + } + } } - //请求缺失的数据 + //界面请求缺失的数据 + public void webDownloadFile( List missingList, String nDid, String name) { + for (Integer missingNumber : missingList) { + int i = missingNumber - 1; + Object object = getDeviceMid(nDid); + if (!Objects.isNull(object)) { + mid = (Integer) object; + } + ReqAndResDto.Req reqAndResParam = getPojo(mid,name,i); + publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false); + //判断是否重发 + sendNextStep(name,nDid,mid,i); + FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + name + mid); + //重发之后判断继续循环还是跳出循环 + if (!Objects.isNull(fileRedisDto) && !Objects.equals(fileRedisDto.getCode(),200)) { + break; + } + mid = mid + 1; + if (mid > 10000) { + mid = 1; + } + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); + } + } + + //录波文件请求缺失的数据 public void downloadFile( List missingList, String nDid, String name) { for (Integer missingNumber : missingList) { int i = missingNumber - 1; diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index 27d7538..6a113f1 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -74,34 +74,37 @@ public class FileServiceImpl implements IFileService { String fileName = appFileMessage.getMsg().getFileInfo().getName(); //缓存文件信息用于文件流拼接 FileInfoDto fileInfoDto = new FileInfoDto(); - WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class).get(0); - fileInfoDto.setStartTime(waveTimeDto.getStartTime()); - fileInfoDto.setEndTime(waveTimeDto.getEndTime()); - fileInfoDto.setDeviceId(waveTimeDto.getDeviceId()); - fileInfoDto.setLineId(waveTimeDto.getLineId()); - fileInfoDto.setName(appFileMessage.getMsg().getFileInfo().getName()); - fileInfoDto.setFileTime(appFileMessage.getMsg().getFileInfo().getFileTime()); - fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize()); - fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck()); - fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType()); - fileInfoDto.setLocation(waveTimeDto.getLocation()); - //存储波形文件 - CsWave csWave = new CsWave(); - csWave.setNdid(appFileMessage.getId()); - csWave.setCreateTime(LocalDateTime.now()); - csWave.setStartTime(LocalDateTime.parse(waveTimeDto.getStartTime(), fmt)); - csWave.setEndTime(LocalDateTime.parse(waveTimeDto.getEndTime(), fmt)); - csWave.setRcdName(fileName); - csWave.setLocation(waveTimeDto.getLocation()); - csWave.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize()); - csWave.setCheckType(appFileMessage.getMsg().getFileInfo().getFileChkType()); - csWave.setCheckNumber(appFileMessage.getMsg().getFileInfo().getFileCheck()); - csWave.setStatus(0); - csWaveService.save(csWave); - //请求当前文件的数据 - askFileStream(appFileMessage.getId(),mid,fileName,-1,range); - redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); - redisUtil.delete(AppRedisKey.TIME+fileName); + List list = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class); + if (CollectionUtil.isNotEmpty(list)) { + WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class).get(0); + fileInfoDto.setStartTime(waveTimeDto.getStartTime()); + fileInfoDto.setEndTime(waveTimeDto.getEndTime()); + fileInfoDto.setDeviceId(waveTimeDto.getDeviceId()); + fileInfoDto.setLineId(waveTimeDto.getLineId()); + fileInfoDto.setName(appFileMessage.getMsg().getFileInfo().getName()); + fileInfoDto.setFileTime(appFileMessage.getMsg().getFileInfo().getFileTime()); + fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize()); + fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck()); + fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType()); + fileInfoDto.setLocation(waveTimeDto.getLocation()); + //存储波形文件 + CsWave csWave = new CsWave(); + csWave.setNdid(appFileMessage.getId()); + csWave.setCreateTime(LocalDateTime.now()); + csWave.setStartTime(LocalDateTime.parse(waveTimeDto.getStartTime(), fmt)); + csWave.setEndTime(LocalDateTime.parse(waveTimeDto.getEndTime(), fmt)); + csWave.setRcdName(fileName); + csWave.setLocation(waveTimeDto.getLocation()); + csWave.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize()); + csWave.setCheckType(appFileMessage.getMsg().getFileInfo().getFileChkType()); + csWave.setCheckNumber(appFileMessage.getMsg().getFileInfo().getFileCheck()); + csWave.setStatus(0); + csWaveService.save(csWave); + //请求当前文件的数据 + askFileStream(appFileMessage.getId(),mid,fileName,-1,range); + redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); + redisUtil.delete(AppRedisKey.TIME+fileName); + } } else { throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR); } @@ -136,27 +139,77 @@ public class FileServiceImpl implements IFileService { FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class); if (Objects.isNull(fileInfoDto)) { String fileCheck = redisUtil.getObjectByKey("fileCheck"+fileName).toString(); - if (appFileMessage.getMsg().getFrameTotal() == 1){ + if (appFileMessage.getMsg().getFrameTotal() == 1) { //解析文件入库 filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileCheck,"download"); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!"); + csEventLogs.setNowStep(1); + csEventLogs.setAllStep(1); + csEventLogs.setIsAll(1); + redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); + redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); + redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); } else { - //最后一帧 - if (Objects.equals(appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr())) { - Map filePartMap = readFile(lsFileName); - filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); - //解析文件 - filePath = fileStream(appFileMessage.getMsg().getFrameTotal(), filePartMap, null, fileName, appFileMessage.getId(),fileCheck,"download"); - //删除临时文件 - File file = new File(lsFileName); - if (file.exists()) { - file.delete(); - } - } - //中间帧 - else { - Map filePartMap = readFile(lsFileName); - if (Objects.isNull(filePartMap.get(appFileMessage.getMsg().getFrameCurr()))) { - appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData()); + //收到数据就刷新缓存值 + redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName()), null, 10L); + Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName)); + if (Objects.isNull(object1)){ + fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal()); + fileStreamDto.setNDid(appFileMessage.getId()); + fileStreamDto.setFrameLen(appFileMessage.getMsg().getFrameLen()); + list.add(appFileMessage.getMsg().getFrameCurr()); + fileStreamDto.setList(list); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件"+appFileMessage.getMsg().getFrameTotal()+"帧,这是第"+appFileMessage.getMsg().getFrameCurr()+"帧,记录成功!"); + csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); + csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); + csEventLogs.setIsAll(0); + redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto); + //将数据写入临时文件 + appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData()); + log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); + } else { + FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class); + //防止出现录入重复数据,做个判断 + if (!dto.getList().contains(appFileMessage.getMsg().getFrameCurr())) { + dto.getList().add(appFileMessage.getMsg().getFrameCurr()); + if (Objects.equals(dto.getTotal(), dto.getList().size())) { + Map filePartMap = readFile(lsFileName); + filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); + //解析文件入库 + filePath = fileStream(dto.getTotal(), filePartMap, null, fileName, appFileMessage.getId(),fileCheck,"download"); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,全部收到,解析成功!"); + csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); + csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); + csEventLogs.setIsAll(1); + log.info("当前文件 {} 帧,这是第 {} 帧报文,全部收到,解析成功!", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); + //删除临时文件 + File file = new File(lsFileName); + if (file.exists()) { + file.delete(); + } + redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); + redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); + redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); + } else { + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!"); + csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); + csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); + csEventLogs.setIsAll(0); + redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), dto); + //将数据写入临时文件 + appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); + log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); + } + } else { + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件为重复帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,不做记录!"); + csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); + csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); + csEventLogs.setIsAll(0); } } }