1.治理暂态事件、波形文件记录解析功能;日志记录功能
2.设备上线、掉线日志记录
This commit is contained in:
@@ -0,0 +1,30 @@
|
||||
package com.njcn.zlevent.controller;
|
||||
|
||||
|
||||
import io.swagger.annotations.Api;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import com.njcn.web.controller.BaseController;
|
||||
import springfox.documentation.annotations.ApiIgnore;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 治理暂态文件解析日志 前端控制器
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-09-08
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("/csEventLogs")
|
||||
@Api(tags = "暂态文件日志处理")
|
||||
@AllArgsConstructor
|
||||
@ApiIgnore
|
||||
public class CsEventLogsController extends BaseController {
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.njcn.zlevent.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.njcn.zlevent.pojo.po.CsEventLogs;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 治理暂态文件解析日志 Mapper 接口
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-09-08
|
||||
*/
|
||||
public interface CsEventLogsMapper extends BaseMapper<CsEventLogs
|
||||
> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.njcn.zlevent.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.njcn.zlevent.pojo.po.CsEventLogs;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 治理暂态文件解析日志 服务类
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-09-08
|
||||
*/
|
||||
public interface ICsEventLogsService extends IService<CsEventLogs> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.njcn.zlevent.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.zlevent.mapper.CsEventLogsMapper;
|
||||
import com.njcn.zlevent.pojo.po.CsEventLogs;
|
||||
import com.njcn.zlevent.service.ICsEventLogsService;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 治理暂态文件解析日志 服务实现类
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-09-08
|
||||
*/
|
||||
@Service
|
||||
public class CsEventLogsServiceImpl extends ServiceImpl<CsEventLogsMapper, CsEventLogs> implements ICsEventLogsService {
|
||||
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.njcn.zlevent.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.csharmonic.pojo.po.CsEventPO;
|
||||
|
||||
@@ -115,7 +115,6 @@ public class CsWaveServiceImpl implements ICsWaveService {
|
||||
time -= 8 * 3600;
|
||||
// 将millisecond转换为长整型,并乘以1000以获取微秒
|
||||
long millisecondValue = millisecond.longValue() * 1000;
|
||||
// long millisecondValue = Long.parseLong(String.valueOf(millisecond))*1000;
|
||||
// 计算最终时间
|
||||
long finalTime = subtleTime + millisecondValue;
|
||||
// 如果finalTime大于等于1000000,将startTime增加1秒,finalTime减去1000000
|
||||
|
||||
@@ -72,6 +72,7 @@ public class EventServiceImpl implements IEventService {
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void analysis(AppEventMessage appEventMessage) {
|
||||
//todo 这边到时候装置事件、暂态事件需要分开处理
|
||||
List<CsEventPO> list1 = new ArrayList<>();
|
||||
List<String> records = new ArrayList<String>();
|
||||
List<CsEventUserPO> list2 = new ArrayList<>();
|
||||
|
||||
@@ -21,8 +21,11 @@ import com.njcn.zlevent.param.CsEventParam;
|
||||
import com.njcn.zlevent.pojo.dto.FileInfoDto;
|
||||
import com.njcn.zlevent.pojo.dto.FileStreamDto;
|
||||
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
|
||||
import com.njcn.zlevent.pojo.po.CsEventLogs;
|
||||
import com.njcn.zlevent.service.ICsEventLogsService;
|
||||
import com.njcn.zlevent.service.ICsEventService;
|
||||
import com.njcn.zlevent.service.IFileService;
|
||||
import com.njcn.zlevent.utils.FileCheckUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.sf.json.JSONObject;
|
||||
@@ -33,6 +36,10 @@ import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
@@ -57,6 +64,8 @@ public class FileServiceImpl implements IFileService {
|
||||
|
||||
private final ICsEventService csEventService;
|
||||
|
||||
private final ICsEventLogsService csEventLogsService;
|
||||
|
||||
@Override
|
||||
public void analysisFileInfo(AppFileMessage appFileMessage) {
|
||||
if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){
|
||||
@@ -73,7 +82,7 @@ public class FileServiceImpl implements IFileService {
|
||||
} else {
|
||||
int total = (int)Math.ceil(fileSize*1.0/range) ;
|
||||
for (int i = 0; i < total; i++) {
|
||||
askFileStream(appFileMessage.getId(),mid,fileName,i*range,range-1);
|
||||
askFileStream(appFileMessage.getId(),mid,fileName,i*range,range);
|
||||
mid++;
|
||||
}
|
||||
fileInfoDto.setNumber(mid-1);
|
||||
@@ -101,79 +110,98 @@ public class FileServiceImpl implements IFileService {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void analysisFileStream(AppFileMessage appFileMessage) {
|
||||
//todo 目前文件先只处理暂态事件的,后续有其他文件再做处理
|
||||
String fileName = appFileMessage.getMsg().getName();
|
||||
if(fileName.contains(".cfg") || fileName.contains(".dat")) {
|
||||
FileStreamDto fileStreamDto = new FileStreamDto();
|
||||
String filePath;
|
||||
Map<Integer,String> map = new HashMap<>();
|
||||
//获取缓存的文件信息
|
||||
Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
|
||||
FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
|
||||
//文件流
|
||||
Object object = redisUtil.getObjectByKey(fileName);
|
||||
/*
|
||||
* 文件解析存储逻辑
|
||||
* 1.如果文件只有1帧,那就直接解析文件流;
|
||||
* 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件
|
||||
*/
|
||||
if (Objects.isNull(object)){
|
||||
//第一次录入
|
||||
if(fileInfoDto.getNumber() == 1) {
|
||||
//直接解析文件
|
||||
filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId());
|
||||
log.info(fileName + "解析成功");
|
||||
redisUtil.delete(fileName);
|
||||
//波形文件关联事件
|
||||
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
|
||||
correlateEvents(fileInfoDto,filePath);
|
||||
//日志记录
|
||||
CsEventLogs csEventLogs = new CsEventLogs();
|
||||
csEventLogs.setNdid(appFileMessage.getId());
|
||||
csEventLogs.setFileName(appFileMessage.getMsg().getName());
|
||||
try {
|
||||
//todo 目前文件先只处理暂态事件的,后续有其他文件再做处理
|
||||
String fileName = appFileMessage.getMsg().getName();
|
||||
if(fileName.contains(".cfg") || fileName.contains(".dat")) {
|
||||
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
|
||||
FileStreamDto fileStreamDto = new FileStreamDto();
|
||||
String filePath;
|
||||
Map<Integer,String> map = new HashMap<>();
|
||||
//获取缓存的文件信息
|
||||
Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
|
||||
FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
|
||||
//文件流
|
||||
Object object = redisUtil.getObjectByKey(fileName);
|
||||
/*
|
||||
* 文件解析存储逻辑
|
||||
* 1.如果文件只有1帧,那就直接解析文件流;
|
||||
* 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件
|
||||
*/
|
||||
if (Objects.isNull(object)){
|
||||
//第一次录入
|
||||
if(fileInfoDto.getNumber() == 1) {
|
||||
//直接解析文件
|
||||
filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId());
|
||||
redisUtil.delete(fileName);
|
||||
//波形文件关联事件
|
||||
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
|
||||
correlateEvents(fileInfoDto,filePath);
|
||||
csEventLogs.setStatus(1);
|
||||
csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
|
||||
csEventLogs.setCompleteTime(LocalDateTime.now());
|
||||
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
|
||||
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
|
||||
} else {
|
||||
//缓存文件
|
||||
map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
|
||||
fileStreamDto.setMap(map);
|
||||
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
|
||||
csEventLogs.setStatus(1);
|
||||
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
|
||||
csEventLogs.setCompleteTime(LocalDateTime.now());
|
||||
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
|
||||
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
|
||||
}
|
||||
} else {
|
||||
//缓存文件
|
||||
map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
|
||||
fileStreamDto.setMap(map);
|
||||
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
|
||||
//分帧传递数据,需要校验收到的文件个数
|
||||
fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class);
|
||||
Map<Integer,String> l1 = fileStreamDto.getMap();
|
||||
l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
|
||||
if (l1.size() == fileInfoDto.getNumber()){
|
||||
//解析文件
|
||||
filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId());
|
||||
redisUtil.delete(fileName);
|
||||
//波形文件关联事件
|
||||
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
|
||||
correlateEvents(fileInfoDto,filePath);
|
||||
csEventLogs.setStatus(1);
|
||||
csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!");
|
||||
csEventLogs.setCompleteTime(LocalDateTime.now());
|
||||
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
|
||||
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
|
||||
} else {
|
||||
//缓存
|
||||
fileStreamDto = new FileStreamDto();
|
||||
fileStreamDto.setMap(l1);
|
||||
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
|
||||
csEventLogs.setStatus(1);
|
||||
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
|
||||
csEventLogs.setCompleteTime(LocalDateTime.now());
|
||||
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
|
||||
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
|
||||
}
|
||||
}
|
||||
//记录日志
|
||||
csEventLogsService.save(csEventLogs);
|
||||
} else {
|
||||
//分帧传递数据,需要校验收到的文件个数
|
||||
fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class);
|
||||
Map<Integer,String> l1 = fileStreamDto.getMap();
|
||||
l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
|
||||
if (l1.size() == fileInfoDto.getNumber()){
|
||||
//对数据进行排序
|
||||
// 将Map的Entry集合转换成List
|
||||
List<Map.Entry<Integer, String>> entryList = new ArrayList<>(l1.entrySet());
|
||||
// 使用Comparator按Key进行排序
|
||||
entryList.sort(new Comparator<Map.Entry<Integer, String>>() {
|
||||
@Override
|
||||
public int compare(Map.Entry<Integer, String> entry1, Map.Entry<Integer, String> entry2) {
|
||||
return entry1.getKey().compareTo(entry2.getKey());
|
||||
}
|
||||
});
|
||||
//解析文件
|
||||
filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId());
|
||||
log.info(fileName + "解析成功");
|
||||
redisUtil.delete(fileName);
|
||||
//波形文件关联事件
|
||||
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
|
||||
correlateEvents(fileInfoDto,filePath);
|
||||
} else {
|
||||
//缓存
|
||||
fileStreamDto = new FileStreamDto();
|
||||
fileStreamDto.setMap(l1);
|
||||
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
|
||||
}
|
||||
//todo 处理其他文件
|
||||
log.info("暂未做其他文件处理");
|
||||
}
|
||||
} catch (Exception e){
|
||||
csEventLogs.setStatus(0);
|
||||
csEventLogs.setRemark("文件解析失败,失败原因:" + e.getMessage());
|
||||
csEventLogs.setCompleteTime(LocalDateTime.now());
|
||||
//记录日志
|
||||
|
||||
} else {
|
||||
//todo 处理其他文件
|
||||
log.info("暂未做其他文件处理");
|
||||
csEventLogsService.save(csEventLogs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 请求文件流信息
|
||||
*/
|
||||
@@ -233,6 +261,7 @@ public class FileServiceImpl implements IFileService {
|
||||
byteArray = bytes;
|
||||
}
|
||||
InputStream inputStream = new ByteArrayInputStream(byteArray);
|
||||
//todo 此处需要做文件crc校验或者md5校验,目前不知道怎么处理,先放一下
|
||||
String path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH,fileName);
|
||||
try {
|
||||
inputStream.close();
|
||||
@@ -277,6 +306,5 @@ public class FileServiceImpl implements IFileService {
|
||||
csEventParam.setEndTime(fileInfoDto.getEndTime());
|
||||
csEventParam.setPath(path);
|
||||
csEventService.updateCsEvent(csEventParam);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.njcn.zlevent.utils;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.zip.CRC32;
|
||||
|
||||
/**
|
||||
* 类的介绍:用于文件校验
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2023/9/8 13:32
|
||||
*/
|
||||
|
||||
public class FileCheckUtils {
|
||||
|
||||
/**
|
||||
* 32位CRC检验
|
||||
* @param inputStream 文件流
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static long calculateCRC32Checksum(InputStream inputStream) throws IOException {
|
||||
CRC32 crc32 = new CRC32();
|
||||
// 用于读取文件内容的缓冲区
|
||||
byte[] buffer = new byte[1024];
|
||||
int bytesRead;
|
||||
while ((bytesRead = inputStream.read(buffer)) != -1) {
|
||||
crc32.update(buffer, 0, bytesRead);
|
||||
}
|
||||
return crc32.getValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* MD5检验
|
||||
* @param inputStream 文件流
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String calculateMD5Checksum(InputStream inputStream) throws IOException, NoSuchAlgorithmException {
|
||||
MessageDigest md5 = MessageDigest.getInstance("MD5");
|
||||
// 用于读取文件内容的缓冲区
|
||||
byte[] buffer = new byte[8192];
|
||||
int bytesRead;
|
||||
while ((bytesRead = inputStream.read(buffer)) != -1) {
|
||||
md5.update(buffer, 0, bytesRead);
|
||||
}
|
||||
byte[] digest = md5.digest();
|
||||
// 将MD5摘要转换为十六进制字符串
|
||||
StringBuilder md5Checksum = new StringBuilder();
|
||||
for (byte b : digest) {
|
||||
md5Checksum.append(String.format("%02x", b));
|
||||
}
|
||||
return md5Checksum.toString();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user