From 34fc978543b9a882b8f1d1aba78e3cdb4e0ed366 Mon Sep 17 00:00:00 2001 From: xuyang <748613696@qq.com> Date: Sun, 8 Oct 2023 10:46:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E6=80=81=E6=B3=A2=E5=BD=A2=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E6=8E=A5=E6=94=B6=E8=A7=A3=E6=9E=90=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/handler/MqttMessageHandler.java | 32 +++-- .../runner/AccessApplicationRunner.java | 94 ++++++------- .../njcn/zlevent/pojo/po/CsEventFileLogs.java | 6 + .../java/com/njcn/zlevent/pojo/po/CsWave.java | 3 + .../com/njcn/zlevent/mapper/CsWaveMapper.java | 16 +++ .../njcn/zlevent/service/ICsWaveService.java | 23 ++++ .../impl/CsWaveAnalysisServiceImpl.java | 5 +- .../service/impl/CsWaveServiceImpl.java | 27 ++++ .../zlevent/service/impl/FileServiceImpl.java | 126 +++++++++++++----- 9 files changed, 236 insertions(+), 96 deletions(-) create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsWaveMapper.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 065f3c5..ff265d3 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -43,6 +43,7 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -52,6 +53,8 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** @@ -361,6 +364,7 @@ public class MqttMessageHandler { /** * 装置心跳 && 主动数据上送 + * fixme 这边由于接收文件数据时间跨度会很长,途中有其他请求进来会中断之前的程序,目前是记录中断的位置,等处理完成再继续请求接收文件 * @param topic * @param message * @param version @@ -388,7 +392,7 @@ public class MqttMessageHandler { reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_29.getCode())); reqAndResParam.setCode(200); //fixme 前置处理的时间应该是UTC时间,所以需要加8小时。 - String json = "{Time:\""+(System.currentTimeMillis()/1000+8*3600)+"\"}"; + String json = "{Time:"+(System.currentTimeMillis()/1000+8*3600)+"}"; net.sf.json.JSONObject jsonObject = net.sf.json.JSONObject.fromObject(json); reqAndResParam.setMsg(jsonObject); publisher.send("/Dev/DataRsp/"+version+"/"+nDid,gson.toJson(reqAndResParam),1,false); @@ -407,6 +411,18 @@ public class MqttMessageHandler { break; case 4866: AutoDataDto dataDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), AutoDataDto.class); + //mid大于0,则需要应答设备侧 + if (dataDto.getMid() > 0){ + ReqAndResDto.Res response = new ReqAndResDto.Res(); + response.setMid(dataDto.getMid()); + response.setDid(dataDto.getDid()); + response.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + response.setType(Integer.parseInt(TypeEnum.TYPE_15.getCode())); + response.setCode(200); + log.info("应答事件:" + new Gson().toJson(response)); + publisher.send("/Dev/DataRsp/"+version+"/"+nDid,new Gson().toJson(response),1,false); + } + //判断事件类型 switch (dataDto.getMsg().getDataAttr()) { //暂态事件、录波处理 case 0: @@ -433,17 +449,6 @@ public class MqttMessageHandler { default: break; } - //mid大于0,则需要应答设备侧 - if (dataDto.getMid() > 0){ - ReqAndResDto.Res response = new ReqAndResDto.Res(); - response.setMid(dataDto.getMid()); - response.setDid(dataDto.getDid()); - response.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - response.setType(Integer.parseInt(TypeEnum.TYPE_15.getCode())); - response.setCode(200); - log.info("应答事件:" + new Gson().toJson(response)); - publisher.send("/Dev/DataRsp/"+version+"/"+nDid,new Gson().toJson(response),1,false); - } break; default: break; @@ -471,15 +476,16 @@ public class MqttMessageHandler { switch (fileDto.getType()){ case 4657: log.info("获取文件信息"); + log.info("文件信息响应:" + fileDto); appFileMessageTemplate.sendMember(appFileMessage); break; case 4658: log.info("获取文件流信息"); + redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName() + appFileMessage.getMid()), appFileMessage.getMid(),3600L); appFileStreamMessageTemplate.sendMember(appFileMessage); break; default: break; } } - } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java index 716417d..fc726b4 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java @@ -1,47 +1,47 @@ -//package com.njcn.access.runner; -// -//import com.njcn.access.service.ICsEquipmentDeliveryService; -//import com.njcn.access.service.ICsTopicService; -//import com.njcn.access.service.impl.CsDeviceServiceImpl; -//import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.boot.ApplicationArguments; -//import org.springframework.boot.ApplicationRunner; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Resource; -//import java.util.List; -//import java.util.Objects; -// -///** -// * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入 -// * -// * @author xuyang -// * @version 1.0.0 -// * @createTime 2023/8/28 13:57 -// */ -//@Component -//@Slf4j -//public class AccessApplicationRunner implements ApplicationRunner { -// -// @Resource -// private CsDeviceServiceImpl csDeviceService; -// -// @Resource -// private ICsTopicService csTopicService; -// -// @Resource -// private ICsEquipmentDeliveryService csEquipmentDeliveryService; -// -// @Override -// public void run(ApplicationArguments args){ -// List list = csEquipmentDeliveryService.getAll(); -// list.forEach(item->{ -// String version = csTopicService.getVersion(item.getNdid()); -// if (!Objects.isNull(version)){ -// csDeviceService.devAccess(item.getNdid(),version); -// } -// }); -// } -// -//} +package com.njcn.access.runner; + +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.access.service.ICsTopicService; +import com.njcn.access.service.impl.CsDeviceServiceImpl; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Objects; + +/** + * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入 + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/28 13:57 + */ +@Component +@Slf4j +public class AccessApplicationRunner implements ApplicationRunner { + + @Resource + private CsDeviceServiceImpl csDeviceService; + + @Resource + private ICsTopicService csTopicService; + + @Resource + private ICsEquipmentDeliveryService csEquipmentDeliveryService; + + @Override + public void run(ApplicationArguments args){ + List list = csEquipmentDeliveryService.getAll(); + list.forEach(item->{ + String version = csTopicService.getVersion(item.getNdid()); + if (!Objects.isNull(version)){ + csDeviceService.devAccess(item.getNdid(),version); + } + }); + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventFileLogs.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventFileLogs.java index 3ec04a9..0e01487 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventFileLogs.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventFileLogs.java @@ -68,4 +68,10 @@ public class CsEventFileLogs { */ private String location; + private Integer nowStep; + + private Integer allStep; + + private Integer isAll; + } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java index 6a2c398..5667c29 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java @@ -4,6 +4,8 @@ import com.baomidou.mybatisplus.annotation.TableName; import com.njcn.db.bo.BaseEntity; import java.io.Serializable; import java.time.LocalDateTime; + +import lombok.Data; import lombok.Getter; import lombok.Setter; @@ -15,6 +17,7 @@ import lombok.Setter; * @author xuyang * @since 2023-09-27 */ +@Data @TableName("cs_wave") public class CsWave { diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsWaveMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsWaveMapper.java new file mode 100644 index 0000000..fecbf54 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsWaveMapper.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.zlevent.pojo.po.CsWave; + +/** + *

+ * 治理波形文件记录表 Mapper 接口 + *

+ * + * @author xuyang + * @since 2023-09-27 + */ +public interface CsWaveMapper extends BaseMapper { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java new file mode 100644 index 0000000..60023eb --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java @@ -0,0 +1,23 @@ +package com.njcn.zlevent.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.zlevent.pojo.po.CsWave; + +/** + *

+ * 治理波形文件记录表 服务类 + *

+ * + * @author xuyang + * @since 2023-09-27 + */ +public interface ICsWaveService extends IService { + + /** + * 根据文件名称模糊匹配文件个数 + * @param fileName 文件名称 + * @return + */ + int findCountByName(String fileName); + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java index 46ec786..07699e9 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java @@ -22,6 +22,7 @@ import com.njcn.zlevent.pojo.constant.ZlConstant; import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.service.ICsWaveAnalysisService; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; import org.springframework.stereotype.Service; @@ -36,6 +37,7 @@ import java.util.stream.Collectors; * @version 1.0.0 * @createTime 2023/9/5 14:52 */ +@Slf4j @Service @AllArgsConstructor public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { @@ -108,6 +110,7 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { String json = "{Name:\""+fileName+"\"}"; JSONObject jsonObject = JSONObject.fromObject(json); reqAndResParam.setMsg(jsonObject); + log.info("请求文件信息报文:" + new Gson().toJson(reqAndResParam)); publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); } @@ -138,7 +141,7 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { waveTimeDto.setDeviceId(deviceId); waveTimeDto.setLineId(lineId); waveTimeDto.setLocation(location); - redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,60L); + redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,600L); } /** diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java new file mode 100644 index 0000000..cf54967 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java @@ -0,0 +1,27 @@ +package com.njcn.zlevent.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.zlevent.mapper.CsWaveMapper; +import com.njcn.zlevent.pojo.po.CsWave; +import com.njcn.zlevent.service.ICsWaveService; +import org.springframework.stereotype.Service; + +/** + *

+ * 治理波形文件记录表 服务实现类 + *

+ * + * @author xuyang + * @since 2023-09-27 + */ +@Service +public class CsWaveServiceImpl extends ServiceImpl implements ICsWaveService { + + @Override + public int findCountByName(String fileName) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.like(CsWave::getRcdName,fileName); + return this.baseMapper.selectCount(lambdaQueryWrapper); + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index 2e5e302..b731281 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -24,8 +24,10 @@ import com.njcn.zlevent.pojo.dto.FileInfoDto; import com.njcn.zlevent.pojo.dto.FileStreamDto; import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.pojo.po.CsEventFileLogs; +import com.njcn.zlevent.pojo.po.CsWave; import com.njcn.zlevent.service.ICsEventFileLogsService; import com.njcn.zlevent.service.ICsEventService; +import com.njcn.zlevent.service.ICsWaveService; import com.njcn.zlevent.service.IFileService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -38,6 +40,7 @@ import java.io.InputStream; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.concurrent.TimeUnit; /** * 类的介绍: @@ -65,6 +68,8 @@ public class FileServiceImpl implements IFileService { private final WavePicFeignClient wavePicFeignClient; + private final ICsWaveService csWaveService; + @Override public void analysisFileInfo(AppFileMessage appFileMessage) { if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){ @@ -74,18 +79,6 @@ public class FileServiceImpl implements IFileService { String fileName = appFileMessage.getMsg().getFileInfo().getName(); //缓存文件信息用于文件流拼接 FileInfoDto fileInfoDto = new FileInfoDto(); - //文件流请求 判断文件大小是否需要分片请求,单次文件大小为50k - if (fileSize <= range){ - askFileStream(appFileMessage.getId(),mid,fileName,0,fileSize); - fileInfoDto.setNumber(1); - } else { - int total = (int)Math.ceil(fileSize*1.0/range) ; - for (int i = 0; i < total; i++) { - askFileStream(appFileMessage.getId(),mid,fileName,i*range,range); - mid++; - } - fileInfoDto.setNumber(mid-1); - } //获取波形文件起始结束时间 Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.TIME+fileName); if (Objects.isNull(fileInfo)){ @@ -102,7 +95,18 @@ public class FileServiceImpl implements IFileService { fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck()); fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType()); fileInfoDto.setLocation(waveTimeDto.getLocation()); - redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto, 3600L); + //文件流请求 判断文件大小是否需要分片请求,单次文件大小为50k + if (fileSize <= range){ + askFileStream(appFileMessage.getId(),mid,fileName,0,fileSize); + fileInfoDto.setNumber(1); + redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + mid)); + redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); + } else { + int total = (int)Math.ceil(fileSize*1.0/range); + fileInfoDto.setNumber(total); + redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); + askFileStream(appFileMessage.getId(), 1, fileName, 0, range); + } redisUtil.delete(AppRedisKey.TIME+fileName); } else { throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR); @@ -111,6 +115,11 @@ public class FileServiceImpl implements IFileService { @Override public void analysisFileStream(AppFileMessage appFileMessage) { + int range = 51200; + //波形文件上传成功后,将文件信息存储一下,方便后期查看 + CsWave csWave = new CsWave(); + csWave.setNdid(appFileMessage.getId()); + csWave.setCreateTime(LocalDateTime.now()); //日志记录 CsEventFileLogs csEventLogs = new CsEventFileLogs(); csEventLogs.setNdid(appFileMessage.getId()); @@ -138,24 +147,36 @@ public class FileServiceImpl implements IFileService { if(fileInfoDto.getNumber() == 1) { //直接解析文件 filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId()); - redisUtil.delete(fileName); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!"); + csEventLogs.setNowStep(1); + csEventLogs.setAllStep(1); + csEventLogs.setIsAll(1); + //记录文件信息 + createCsWave(csWave,fileInfoDto,fmt,fileName); //波形文件关联事件 filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); - List eventList = correlateEvents(fileInfoDto,filePath); + List eventList = correlateEvents(fileInfoDto,filePath,fileName); //波形文件解析成图片 if (CollectionUtil.isNotEmpty(eventList)){ eventList.forEach(wavePicFeignClient::getWavePics); } - csEventLogs.setStatus(1); - csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!"); - //todo 记录文件信息 + redisUtil.delete(fileName); + redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName())); } else { //缓存文件 map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData()); fileStreamDto.setMap(map); - redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); + redisUtil.saveByKey(fileName, fileStreamDto); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!"); + csEventLogs.setNowStep(appFileMessage.getMid()); + csEventLogs.setAllStep(fileInfoDto.getNumber()); + csEventLogs.setIsAll(0); + log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", fileInfoDto.getNumber(), appFileMessage.getMid()); + //收到文件后,继续请求下一帧报文 + askFileStream(appFileMessage.getId(), appFileMessage.getMid()+1, fileName, appFileMessage.getMid() * range, range); + redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + appFileMessage.getMid())); } } else { //分帧传递数据,需要校验收到的文件个数 @@ -165,25 +186,37 @@ public class FileServiceImpl implements IFileService { if (l1.size() == fileInfoDto.getNumber()){ //解析文件 filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId()); - redisUtil.delete(fileName); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!"); + csEventLogs.setNowStep(l1.size()); + csEventLogs.setAllStep(l1.size()); + csEventLogs.setIsAll(1); + log.info("当前文件 {} 帧,这是第 {} 帧报文,全部收到,解析成功!", l1.size(), l1.size()); + //记录文件信息 + createCsWave(csWave,fileInfoDto,fmt,fileName); //波形文件关联事件 filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); - List eventList = correlateEvents(fileInfoDto,filePath); + List eventList = correlateEvents(fileInfoDto,filePath,fileName); //波形文件解析成图片 if (CollectionUtil.isNotEmpty(eventList)){ eventList.forEach(wavePicFeignClient::getWavePics); } - csEventLogs.setStatus(1); - csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!"); - //todo 记录文件信息 - + redisUtil.delete(fileName); + redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName())); } else { //缓存 fileStreamDto = new FileStreamDto(); fileStreamDto.setMap(l1); - redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); + redisUtil.saveByKey(fileName, fileStreamDto); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!"); + csEventLogs.setNowStep(appFileMessage.getMid()); + csEventLogs.setAllStep(fileInfoDto.getNumber()); + csEventLogs.setIsAll(0); + log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", fileInfoDto.getNumber(), appFileMessage.getMid()); + //收到文件后,继续请求下一帧报文 + askFileStream(appFileMessage.getId(), appFileMessage.getMid()+1, fileName, appFileMessage.getMid() * range, range); + redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + appFileMessage.getMid())); } } csEventLogs.setCompleteTime(LocalDateTime.now()); @@ -302,14 +335,37 @@ public class FileServiceImpl implements IFileService { /** * 波形文件关联事件 */ - public List correlateEvents(FileInfoDto fileInfoDto, String path) { - CsEventParam csEventParam = new CsEventParam(); - csEventParam.setLineId(fileInfoDto.getLineId()); - csEventParam.setDeviceId(fileInfoDto.getDeviceId()); - csEventParam.setStartTime(fileInfoDto.getStartTime()); - csEventParam.setEndTime(fileInfoDto.getEndTime()); - csEventParam.setPath(path); - csEventParam.setLocation(fileInfoDto.getLocation()); - return csEventService.updateCsEvent(csEventParam); + public List correlateEvents(FileInfoDto fileInfoDto, String path, String fileName) { + List list = new ArrayList<>(); + String[] parts = fileName.split(StrUtil.SLASH); + fileName = parts[parts.length - 1].split("\\.")[0]; + int fileCounts = csWaveService.findCountByName(fileName); + if (fileCounts >= 2){ + CsEventParam csEventParam = new CsEventParam(); + csEventParam.setLineId(fileInfoDto.getLineId()); + csEventParam.setDeviceId(fileInfoDto.getDeviceId()); + csEventParam.setStartTime(fileInfoDto.getStartTime()); + csEventParam.setEndTime(fileInfoDto.getEndTime()); + csEventParam.setPath(path); + csEventParam.setLocation(fileInfoDto.getLocation()); + list = csEventService.updateCsEvent(csEventParam); + } + return list; } + + /** + * 生成波形记录 + */ + public void createCsWave(CsWave csWave, FileInfoDto fileInfoDto, DateTimeFormatter fmt, String fileName) { + csWave.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt)); + csWave.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt)); + csWave.setRcdName(fileName); + csWave.setLocation(fileInfoDto.getLocation()); + csWave.setFileSize(fileInfoDto.getFileSize()); + csWave.setCheckType(fileInfoDto.getFileChkType()); + csWave.setCheckNumber(fileInfoDto.getFileCheck()); + csWaveService.save(csWave); + } + + }