1.设备注册接入优化

2.波形文件接收、解析功能调整
This commit is contained in:
2023-10-13 10:49:43 +08:00
parent 340e7dc75f
commit 6e12027e69
27 changed files with 894 additions and 662 deletions

View File

@@ -6,9 +6,13 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.param.DataArrayParam;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.cswarn.api.CsEquipmentAlarmFeignClient;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.mq.message.AppAutoDataMessage;
@@ -56,6 +60,8 @@ public class StatServiceImpl implements IStatService {
private final RedisUtil redisUtil;
private final EquipmentFeignClient equipmentFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppAutoDataMessage appAutoDataMessage) {
@@ -86,6 +92,8 @@ public class StatServiceImpl implements IStatService {
if(Objects.isNull(object2)) {
saveData();
}
//获取当前设备信息
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appAutoDataMessage.getId()).getData();
if (CollectionUtil.isNotEmpty(list)){
List<String> recordList = new ArrayList<>();
for (AppAutoDataMessage.DataArray item : list) {
@@ -116,7 +124,7 @@ public class StatServiceImpl implements IStatService {
} else {
dataArrayList = objectToList(object);
}
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod());
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess());
recordList.addAll(result);
}
if (CollectionUtil.isNotEmpty(recordList)){
@@ -183,7 +191,7 @@ public class StatServiceImpl implements IStatService {
/**
* influxDB数据组装
*/
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod) {
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process) {
List<String> records = new ArrayList<String>();
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
@@ -206,6 +214,7 @@ public class StatServiceImpl implements IStatService {
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod);
tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
tags.put(InfluxDBTableConstant.PROCESS,process.toString());
Map<String,Object> fields = new HashMap<>();
fields.put(dataArrayList.get(i).getName(),floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());

View File

@@ -1,29 +1,33 @@
package com.njcn;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.StatBootApplication;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.Test;
import org.springframework.web.bind.annotation.ResponseBody;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
/**
* Unit test for simple App.
*/
@RunWith(SpringRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = StatBootApplication.class)
public class AppTest
{
@@ -33,6 +37,7 @@ public class AppTest
@Resource
private InfluxDbUtils influxDbUtils;
/**
* Rigorous Test :-)
*/
@@ -42,4 +47,39 @@ public class AppTest
assertTrue( true );
}
@Test
public void addRedis() {
Map<String, String> tags1 = new HashMap<>();
tags1.put("LineID", "4");
tags1.put("Phasic_Type", "A");
Map<String, Object> fields1 = new HashMap<>();
fields1.put("RMS", 4.1111);
fields1.put("RMS_AB", 4.1111);
fields1.put("RMS_BC", 4.1111);
fields1.put("RMS_CA", 4.1111);
Map<String, String> tags2 = new HashMap<>();
tags2.put("LineID", "5");
tags2.put("Phasic_Type", "A");
Map<String, Object> fields2 = new HashMap<>();
fields2.put("RMS", 5.1111);
fields2.put("RMS_AB", 5.1111);
fields2.put("RMS_BC", 5.1111);
// 一条记录值。注意生产环境不要用System.currentTimeMillis(),因为数据量大会产生重复时间戳,导致数据丢失,要用数据自己的时间戳,这里只做演示)
Point point1 = influxDbUtils.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
Point point2 = influxDbUtils.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
// BatchPoints batchPoints1 = BatchPoints.database("Data_v").tag("LineID", "4").tag("Phasic_Type","A").retentionPolicy("").consistency(ConsistencyLevel.ALL).precision(TimeUnit.MILLISECONDS).build();
BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "4").tag("Phasic_Type", "A").retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints1.point(point1);
BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "5").tag("Phasic_Type", "A").retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
// 将两条记录添加到batchPoints中
batchPoints2.point(point2);
// 将不同的batchPoints序列化后一次性写入数据库提高写入速度
List<String> records = new ArrayList<String>();
records.add(batchPoints1.lineProtocol());
records.add(batchPoints2.lineProtocol());
// 将两条数据批量插入到数据库中
influxDbUtils.batchInsert("test", "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
}

View File

@@ -23,9 +23,6 @@ public class FileInfoDto {
private String fileChkType;
@ApiModelProperty("报文数量")
private Integer number;
private String startTime;
private String endTime;

View File

@@ -3,7 +3,8 @@ package com.njcn.zlevent.pojo.dto;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
import java.util.HashSet;
import java.util.List;
/**
* 类的介绍:
@@ -15,6 +16,12 @@ import java.util.Map;
@Data
public class FileStreamDto implements Serializable {
private Map<Integer,String> map;
private Integer total;
private String nDid;
private Integer frameLen;
private List<Integer> list;
}

View File

@@ -73,5 +73,10 @@ public class CsWave {
*/
private LocalDateTime createTime;
/**
* 文件获取状态 0:获取失败 1:获取成功)
*/
private Integer status;
}

View File

@@ -1,13 +1,23 @@
package com.njcn.zlevent.listener;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.Message;
@@ -16,9 +26,11 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/**
* @author hongawen
@@ -35,6 +47,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
@Resource
private RedisUtil redisUtil;
@Resource
private CsTopicFeignClient csTopicFeignClient;
@Resource
private MqttPublisher publisher;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@@ -63,5 +81,45 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
});
redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L);
}
//文件流规定时间未收全,请求缺失数据,需要一帧一帧询问
else if (expiredKey.startsWith(AppRedisKey.FILE_PART_TIME)) {
List<Integer> missingList = new ArrayList<>();
String fileName = expiredKey.split(AppRedisKey.FILE_PART_TIME)[1];
Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class);
int start = 1;
int end = dto.getTotal();
IntStream.rangeClosed(start, end)
.filter(i -> !dto.getList().contains(i))
.forEach(missingNumber -> {
log.info("缺失的数字:{}",missingNumber);
missingList.add(missingNumber);
});
redisUtil.saveByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName), missingList);
Integer offset = (missingList.get(0) - 1) * dto.getFrameLen();
askMissingFileStream(dto.getNDid(),missingList.get(0),fileName,offset,dto.getFrameLen());
}
}
public void askMissingFileStream(String nDid, Integer mid, String fileName, Integer offset, Integer len) {
String version = csTopicFeignClient.find(nDid).getData();
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode()));
reqAndResParam.setExpire(-1);
String json = "{Name:\""+fileName+"\",Offset:"+offset+",Len:"+len+"}";
JSONObject jsonObject = JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject);
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
log.info("请求文件流报文:" + new Gson().toJson(reqAndResParam));
}
}

View File

@@ -20,4 +20,10 @@ public interface ICsWaveService extends IService<CsWave> {
*/
int findCountByName(String fileName);
/**
* 修改文件状态
* @param fileName
*/
void updateCsWave(String fileName);
}

View File

@@ -1,6 +1,7 @@
package com.njcn.zlevent.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.zlevent.mapper.CsWaveMapper;
import com.njcn.zlevent.pojo.po.CsWave;
@@ -21,7 +22,14 @@ public class CsWaveServiceImpl extends ServiceImpl<CsWaveMapper, CsWave> impleme
@Override
public int findCountByName(String fileName) {
LambdaQueryWrapper<CsWave> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.like(CsWave::getRcdName,fileName);
lambdaQueryWrapper.like(CsWave::getRcdName,fileName).eq(CsWave::getStatus,1);
return this.baseMapper.selectCount(lambdaQueryWrapper);
}
@Override
public void updateCsWave(String fileName) {
LambdaUpdateWrapper<CsWave> lambdaQueryWrapper = new LambdaUpdateWrapper<>();
lambdaQueryWrapper.eq(CsWave::getRcdName,fileName).set(CsWave::getStatus,1);
this.update(lambdaQueryWrapper);
}
}

View File

@@ -6,6 +6,7 @@ import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
@@ -86,7 +87,7 @@ public class EventServiceImpl implements IEventService {
lineInfo(appEventMessage.getId());
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData();
try {
//处理事件数据
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
@@ -95,7 +96,9 @@ public class EventServiceImpl implements IEventService {
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
csEvent.setDeviceId(deviceId);
csEvent.setDeviceId(po.getId());
csEvent.setProcess(po.getProcess());
csEvent.setCode(item.getCode());
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(eventTime);
tag = item.getName();
@@ -145,7 +148,7 @@ public class EventServiceImpl implements IEventService {
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(deviceId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
@@ -162,12 +165,12 @@ public class EventServiceImpl implements IEventService {
}
//推送事件逻辑处理 && cs_event_user入库
for (AppEventMessage.DataArray item : dataArray) {
sendEventUtils.sendUser(1,item.getType(),deviceId,item.getName(),eventTime,appEventMessage.getId(),id);
sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,appEventMessage.getId(),id);
}
} catch (Exception e) {
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(deviceId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(eventTime);
csEventLogs.setTag(tag);
csEventLogs.setStatus(0);

View File

@@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.alibaba.nacos.shaded.com.google.gson.GsonBuilder;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum;
@@ -34,9 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.stereotype.Service;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@@ -73,9 +73,9 @@ public class FileServiceImpl implements IFileService {
@Override
public void analysisFileInfo(AppFileMessage appFileMessage) {
if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
int mid = 1;
int range = 51200;
Integer fileSize = appFileMessage.getMsg().getFileInfo().getFileSize();
String fileName = appFileMessage.getMsg().getFileInfo().getName();
//缓存文件信息用于文件流拼接
FileInfoDto fileInfoDto = new FileInfoDto();
@@ -95,18 +95,22 @@ public class FileServiceImpl implements IFileService {
fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck());
fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType());
fileInfoDto.setLocation(waveTimeDto.getLocation());
//文件流请求 判断文件大小是否需要分片请求,单次文件大小为50k
if (fileSize <= range){
askFileStream(appFileMessage.getId(),mid,fileName,0,fileSize);
fileInfoDto.setNumber(1);
redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + mid));
redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
} else {
int total = (int)Math.ceil(fileSize*1.0/range);
fileInfoDto.setNumber(total);
redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
askFileStream(appFileMessage.getId(), 1, fileName, 0, range);
}
//存储波形文件
CsWave csWave = new CsWave();
csWave.setNdid(appFileMessage.getId());
csWave.setCreateTime(LocalDateTime.now());
csWave.setStartTime(LocalDateTime.parse(waveTimeDto.getStartTime(), fmt));
csWave.setEndTime(LocalDateTime.parse(waveTimeDto.getEndTime(), fmt));
csWave.setRcdName(fileName);
csWave.setLocation(waveTimeDto.getLocation());
csWave.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
csWave.setCheckType(appFileMessage.getMsg().getFileInfo().getFileChkType());
csWave.setCheckNumber(appFileMessage.getMsg().getFileInfo().getFileCheck());
csWave.setStatus(0);
csWaveService.save(csWave);
//请求当前文件的数据
askFileStream(appFileMessage.getId(),mid,fileName,-1,range);
redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
redisUtil.delete(AppRedisKey.TIME+fileName);
} else {
throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR);
@@ -115,7 +119,10 @@ public class FileServiceImpl implements IFileService {
@Override
public void analysisFileStream(AppFileMessage appFileMessage) {
int range = 51200;
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
String filePath;
List<Integer> list = new ArrayList<>();
FileStreamDto fileStreamDto = new FileStreamDto();
//波形文件上传成功后,将文件信息存储一下,方便后期查看
CsWave csWave = new CsWave();
csWave.setNdid(appFileMessage.getId());
@@ -124,99 +131,112 @@ public class FileServiceImpl implements IFileService {
CsEventFileLogs csEventLogs = new CsEventFileLogs();
csEventLogs.setNdid(appFileMessage.getId());
csEventLogs.setFileName(appFileMessage.getMsg().getName());
csEventLogs.setStatus(0);
try {
//todo 目前文件先只处理暂态事件的,后续有其他文件再做处理
//todo 目前文件先只处理波形事件的,后续有其他文件再做处理
String fileName = appFileMessage.getMsg().getName();
String lsFileName =fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1];
//获取缓存的文件信息
Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
//1.判断当前文件是否之前缓存过,没缓存,则先缓存(这边缓存两条记录,一条是用来判断超时的,还有一条记录文件数据,文件数据目前没有过期时间,文件数据收完才会删除)
//2.缓存了判断收到的报文个数是否和总个数一致,一致则解析文件;不一致则更新缓存
//3.超时判断: 30s分钟未收到相关文件信息核查文件个数看丢失哪些帧重新请求
if(fileName.contains(".cfg") || fileName.contains(".dat")) {
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
FileStreamDto fileStreamDto = new FileStreamDto();
String filePath;
Map<Integer,String> map = new HashMap<>();
//获取缓存的文件信息
Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
//文件流
Object object = redisUtil.getObjectByKey(fileName);
/*
* 文件解析存储逻辑
* 1.如果文件只有1帧那就直接解析文件流
* 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件
*/
if (Objects.isNull(object)){
//第一次录入
if(fileInfoDto.getNumber() == 1) {
//直接解析文件
filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId());
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
csEventLogs.setNowStep(1);
csEventLogs.setAllStep(1);
csEventLogs.setIsAll(1);
//记录文件信息
createCsWave(csWave,fileInfoDto,fmt,fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
List<String> eventList = correlateEvents(fileInfoDto,filePath,fileName);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList)){
eventList.forEach(wavePicFeignClient::getWavePics);
}
redisUtil.delete(fileName);
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
} else {
//缓存文件
map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
fileStreamDto.setMap(map);
redisUtil.saveByKey(fileName, fileStreamDto);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
csEventLogs.setNowStep(appFileMessage.getMid());
csEventLogs.setAllStep(fileInfoDto.getNumber());
csEventLogs.setIsAll(0);
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", fileInfoDto.getNumber(), appFileMessage.getMid());
//收到文件后,继续请求下一帧报文
askFileStream(appFileMessage.getId(), appFileMessage.getMid()+1, fileName, appFileMessage.getMid() * range, range);
redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + appFileMessage.getMid()));
if (appFileMessage.getMsg().getFrameTotal() == 1){
//解析文件入库
filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId());
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
csEventLogs.setNowStep(1);
csEventLogs.setAllStep(1);
csEventLogs.setIsAll(1);
//更新文件信息
csWaveService.updateCsWave(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
List<String> eventList = correlateEvents(fileInfoDto,filePath,fileName);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList)){
eventList.forEach(wavePicFeignClient::getWavePics);
}
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
} else {
//分帧传递数据,需要校验收到的文件个数
fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class);
Map<Integer,String> l1 = fileStreamDto.getMap();
l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
if (l1.size() == fileInfoDto.getNumber()){
//解析文件
filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId());
Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
if (Objects.isNull(object1)){
fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal());
fileStreamDto.setNDid(appFileMessage.getId());
fileStreamDto.setFrameLen(appFileMessage.getMsg().getFrameLen());
list.add(appFileMessage.getMsg().getFrameCurr());
fileStreamDto.setList(list);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!");
csEventLogs.setNowStep(l1.size());
csEventLogs.setAllStep(l1.size());
csEventLogs.setIsAll(1);
log.info("当前文件 {} 帧,这是第 {} 帧报文,全部收到,解析成功!", l1.size(), l1.size());
//记录文件信息
createCsWave(csWave,fileInfoDto,fmt,fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
List<String> eventList = correlateEvents(fileInfoDto,filePath,fileName);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList)){
eventList.forEach(wavePicFeignClient::getWavePics);
}
redisUtil.delete(fileName);
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
} else {
//缓存
fileStreamDto = new FileStreamDto();
fileStreamDto.setMap(l1);
redisUtil.saveByKey(fileName, fileStreamDto);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
csEventLogs.setNowStep(appFileMessage.getMid());
csEventLogs.setAllStep(fileInfoDto.getNumber());
csEventLogs.setRemark("当前文件"+appFileMessage.getMsg().getFrameTotal()+"帧,这是第"+appFileMessage.getMsg().getFrameCurr()+"帧,记录成功!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0);
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", fileInfoDto.getNumber(), appFileMessage.getMid());
//收到文件后,继续请求下一帧报文
askFileStream(appFileMessage.getId(), appFileMessage.getMid()+1, fileName, appFileMessage.getMid() * range, range);
redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + appFileMessage.getMid()));
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 30L);
redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto);
//将数据写入临时文件
appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData());
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
} else {
FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class);
//防止出现录入重复数据,做个判断
if (!dto.getList().contains(appFileMessage.getMsg().getFrameCurr())) {
dto.getList().add(appFileMessage.getMsg().getFrameCurr());
if (Objects.equals(dto.getTotal(), dto.getList().size())) {
Map<Integer, String> filePartMap = readFile(lsFileName);
filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
//解析文件
filePath = fileStream(appFileMessage.getMsg().getFrameTotal(), filePartMap, null, fileName, appFileMessage.getId());
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,全部收到,解析成功!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(1);
log.info("当前文件 {} 帧,这是第 {} 帧报文,全部收到,解析成功!", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
//修改文件信息
csWaveService.updateCsWave(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG, "").replaceAll(GeneralConstant.DAT, "");
List<String> eventList = correlateEvents(fileInfoDto, filePath, fileName);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList)) {
eventList.forEach(wavePicFeignClient::getWavePics);
}
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART_MISSING.concat(appFileMessage.getMsg().getName()));
//删除临时文件
File file = new File(lsFileName);
if (file.exists()) {
file.delete();
}
} else {
Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName));
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0);
if (Objects.isNull(object2)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 30L);
} else {
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L);
}
redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), dto);
//将数据写入临时文件
appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
}
} else {
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件为重复帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,不做记录!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0);
}
}
}
csEventLogs.setCompleteTime(LocalDateTime.now());
@@ -238,7 +258,6 @@ public class FileServiceImpl implements IFileService {
}
}
/**
* 请求文件流信息
*/
@@ -254,6 +273,7 @@ public class FileServiceImpl implements IFileService {
JSONObject jsonObject = JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject);
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
log.info("请求文件流报文:" + new Gson().toJson(reqAndResParam));
}
/**
@@ -354,18 +374,81 @@ public class FileServiceImpl implements IFileService {
}
/**
* 生成波形记录
* 数据写入文件
*/
public void createCsWave(CsWave csWave, FileInfoDto fileInfoDto, DateTimeFormatter fmt, String fileName) {
csWave.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
csWave.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
csWave.setRcdName(fileName);
csWave.setLocation(fileInfoDto.getLocation());
csWave.setFileSize(fileInfoDto.getFileSize());
csWave.setCheckType(fileInfoDto.getFileChkType());
csWave.setCheckNumber(fileInfoDto.getFileCheck());
csWaveService.save(csWave);
public void appendFile(String filePath, Integer key, String value) {
Map<Integer, String> map = new HashMap<>();
map.put(key, value);
try {
FileOutputStream fileOutputStream = new FileOutputStream(filePath, true);
MyObjectOutputStream outputStream = new MyObjectOutputStream(fileOutputStream);
outputStream.writeObject(map);
outputStream.close();
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 文件读取数据
*/
public Map<Integer, String> readFile(String filePath) {
Map<Integer, String> readMap = new HashMap<>();
List<Map<Integer, String>> mapList = new ArrayList<>();
MyObjectInputStream inputStream = null;
try {
inputStream = new MyObjectInputStream(new FileInputStream(filePath));
while (true) {
try {
Map<Integer, String> loadedMap = (Map<Integer, String>) inputStream.readObject();
mapList.add(loadedMap);
} catch (EOFException | ClassNotFoundException e) {
// 读取到文件末尾时退出循环
break;
}
}
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
for (Map<Integer, String> map : mapList) {
for (Map.Entry<Integer, String> entry : map.entrySet()) {
readMap.put(entry.getKey(),entry.getValue());
}
}
return readMap;
}
/**
* -----------------------------------------------
* 重写方法,因为每次文件追加会添加文件头部信息,导致读取错误
*/
class MyObjectOutputStream extends ObjectOutputStream {
public MyObjectOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
protected void writeStreamHeader() throws IOException {
//重写读取头部信息方法:不写入头部信息
super.reset();
}
}
class MyObjectInputStream extends ObjectInputStream {
public MyObjectInputStream(InputStream in) throws IOException {
super(in);
}
@Override
protected void readStreamHeader() throws IOException {
//重写读取头部信息方法:什么也不做
}
}
/**
* 重写方法结束
* -----------------------------------------------
*/
}

View File

@@ -1,14 +1,64 @@
package com.njcn;
import static org.junit.Assert.assertTrue;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.csharmonic.api.WavePicFeignClient;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import com.njcn.zlevent.ZlEventBootApplication;
import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import javax.annotation.Resource;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertTrue;
/**
* Unit test for simple App.
*/
public class AppTest
{
@RunWith(SpringRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = ZlEventBootApplication.class)
public class AppTest {
@Resource
private RedisUtil redisUtil;
@Resource
private FileStorageUtil fileStorageUtil;
@Resource
private WavePicFeignClient wavePicFeignClient;
@Resource
private AppUserFeignClient appUserFeignClient;
@Resource
private InfluxDbUtils influxDbUtils;
/**
* Rigorous Test :-)
*/
@@ -17,4 +67,160 @@ public class AppTest
{
assertTrue( true );
}
@Test
public void test3() {
List<String> records = new ArrayList<String>();
long timeSec = 1695892557L;
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,"000002");
Map<String,Object> fields = new HashMap<>();
fields.put("location","电网侧");
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder("zl_test", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
@Test
public void test2() throws IOException {
String localPath = "C:\\Users\\徐扬\\Desktop\\123.dat";
FileOutputStream fileOut = new FileOutputStream(localPath,true);
ObjectOutputStream objectOut = new ObjectOutputStream(fileOut);
Map<Integer, String> mapToWrite = new HashMap<>();
mapToWrite.put(1, "value1");
mapToWrite.put(2, "value2");
mapToWrite.put(3, "value3");
Map<Integer, String> mapToWrite2 = new HashMap<>();
mapToWrite2.put(11, "value1");
mapToWrite2.put(22, "value2");
mapToWrite2.put(33, "value3");
objectOut.writeObject(mapToWrite);
objectOut.writeObject(mapToWrite2);
objectOut.close();
fileOut.close();
}
@Test
public void test222() throws IOException {
List<Map<String, Integer>> mapList = new ArrayList<>();
String localPath = "C:\\Users\\徐扬\\Desktop\\123.dat";
ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream(localPath));
while (true) {
try {
Map<String, Integer> loadedMap = (Map<String, Integer>) inputStream.readObject();
mapList.add(loadedMap);
} catch (EOFException | ClassNotFoundException e) {
// 读取到文件末尾时退出循环
break;
}
}
System.out.println("list集合为" + mapList);
}
@Test
public void test22() {
String filePath = "C:\\Users\\徐扬\\Desktop\\PQS_PQM1_000005_20231007_141638_348.dat";
Map<Integer, String> readMap = new HashMap<>();
try {
BufferedReader reader = new BufferedReader(new FileReader(filePath));
Gson gson = new Gson();
String line;
while ((line = reader.readLine()) != null) {
readMap = gson.fromJson(line, Map.class);
}
// 关闭流
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("从文件读取的Map: " + readMap.size());
}
@Test
public void test4() {
String fileName = "/sd0:1/comtrade/PQS_PQM1_000005_20231007_141638_348.dat";
Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class);
System.out.println("数据个数:" + dto.getList().size());
System.out.println("总个数:" + dto.getTotal());
IntStream.rangeClosed(1, 476)
.filter(i -> !dto.getList().contains(i))
.forEach(missingNumber -> System.out.println("缺失的数字: " + missingNumber));
}
@Test
public void test5() {
String name = "/sd0:1/comtrade/PQS_PQM1_000005_20231007_141638_348.dat";
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(name), null, 10L);
}
@Test
public void test() {
long time = 1694598894;
Double millisecond = 60000.0;
long subtleTime = 99000L;
long millisecondValue = millisecond.longValue() * 1000;
long time1 = time * 1000000 + subtleTime;
long time2 = time * 1000000 + subtleTime + millisecondValue;
String time1String = String.valueOf(time1);
String time2String = String.valueOf(time2);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time11 = time1String.substring(time1String.length() - 6);
String time111 = time1String.substring(0,time1String.length() - 6);
String formatTime1 = format.format(Long.parseLong(time111) * 1000);
String time22 = time2String.substring(time2String.length() - 6);
String time222 = time2String.substring(0,time2String.length() - 6);
String formatTime2 = format.format(Long.parseLong(time222) * 1000);
WaveTimeDto waveTimeDto = new WaveTimeDto();
waveTimeDto.setStartTime(formatTime1 + "." + time11);
waveTimeDto.setEndTime(formatTime2 + "." + time22);
System.out.println("waveTimeDto===:" + waveTimeDto);
// long endTime;
// //开始时间
// time = (time - 8*3600) * 1000000;
// //结束时间
// long millisecondValue = millisecond.longValue() * 1000;
// endTime = time + subtleTime + millisecondValue;
// System.out.println("endTime==:" + endTime);
// String endTimeString = String.valueOf(endTime);
// String time1 = endTimeString.substring(endTimeString.length() - 6);
// String time2 = endTimeString.substring(0,endTimeString.length() - 6);
// System.out.println("time1==:" + time1);
// System.out.println("time2==:" + time2);
// long longTime2 = Long.parseLong(time2)/1000;
// SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// System.out.println("time==:" + time);
// System.out.println("longTime2==:" + longTime2);
// String formatTime1 = format.format(time/1000000);
// String formatTime2 = format.format(longTime2);
// System.out.println("formatTime1==:" + formatTime1);
// System.out.println("formatTime2==:" + formatTime2);
// WaveTimeDto waveTimeDto = new WaveTimeDto();
// waveTimeDto.setStartTime(formatTime1 + "." + subtleTime);
// waveTimeDto.setEndTime(formatTime2 + "." + time1);
// System.out.println("waveTimeDto==:" + waveTimeDto);
}
}