@@ -4,15 +4,14 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.text.StrPool ;
import cn.hutool.core.util.StrUtil ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.nacos.common.utils.ConcurrentHashSet ;
import com.alibaba.nacos.shaded.com.google.gson.Gson ;
import com.alibaba.nacos.shaded.com.google.gson.GsonBuilder ;
import com.github.tocrhz.mqtt.publisher.MqttPublisher ;
import com.njcn.access.api.CsTopicFeignClient ;
import com.njcn.access.enums.AccessEnum ;
import com.njcn.access.enums.AccessResponseEnum ;
import com.njcn.access.enums.TypeEnum ;
import com.njcn.access.pojo.dto.ReqAndResDto ;
import com.njcn.access.utils.CRC32Utils ;
import com.njcn.common.config.GeneralInfo ;
import com.njcn.common.pojo.exception.BusinessException ;
import com.njcn.csharmonic.api.WavePicFeignClient ;
@@ -41,7 +40,6 @@ import java.io.*;
import java.time.LocalDateTime ;
import java.time.format.DateTimeFormatter ;
import java.util.* ;
import java.util.concurrent.TimeUnit ;
/**
* 类的介绍:
@@ -112,7 +110,7 @@ public class FileServiceImpl implements IFileService {
csWave . setStatus ( 0 ) ;
csWaveService . save ( csWave ) ;
//请求当前文件的数据
askFileStream ( appFileMessage . getId ( ) , mid , fileName , - 1 , range ) ;
askFileStream ( appFileMessage . getId ( ) , mid , fileName , 0 , range ) ;
redisUtil . saveByKey ( AppRedisKey . RMQ_FILE_CONSUME_KEY . concat ( fileInfoDto . getName ( ) ) , fileInfoDto ) ;
redisUtil . delete ( AppRedisKey . TIME + fileName ) ;
} else {
@@ -123,7 +121,7 @@ public class FileServiceImpl implements IFileService {
@Override
public void analysisFileStream ( AppFileMessage appFileMessage ) {
DateTimeFormatter fmt = DateTimeFormatter . ofPattern ( " yyyy-MM-dd HH:mm:ss.SSSSSS " ) ;
String filePath ;
String filePath = null ;
List < Integer > list = new ArrayList < > ( ) ;
FileStreamDto fileStreamDto = new FileStreamDto ( ) ;
//波形文件上传成功后,将文件信息存储一下,方便后期查看
@@ -138,18 +136,53 @@ public class FileServiceImpl implements IFileService {
try {
//todo 目前文件先只处理波形事件的,后续有其他文件再做处理
String fileName = appFileMessage . getMsg ( ) . getName ( ) ;
//String lsFileName =fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1];
String lsFileName = generalInfo . getBusinessTempPath ( ) + File . separator + fileName . split ( StrUtil . SLASH ) [ fileName . split ( StrUtil . SLASH ) . length - 1 ] ;
File lsFile = new File ( generalInfo . getBusinessTempPath ( ) ) ;
//如果文件夹不存在则创建
if ( ! lsFile . exists ( ) & & ! lsFile . isDirectory ( ) ) {
lsFile . mkdirs ( ) ;
}
//获取缓存的文件信息
Object fileInfo = redisUtil . getObjectByKey ( AppRedisKey . RMQ_FILE_CONSUME_KEY . concat ( fileName ) ) ;
FileInfoDto fileInfoDto = JSON . parseObject ( JSON . toJSONString ( fileInfo ) , FileInfoDto . class ) ;
//1.判断当前文件是否之前缓存过,没缓存,则先缓存(这边缓存两条记录,一条是用来判断超时的,还有一条记录文件数据,文件数据目前没有过期时间,文件数据收完才会删除)
//2.缓存了判断收到的报文个数是否和总个数一致,一致则解析文件;不一致则更新缓存
//3.超时判断: 30s分钟未收到相关文件信息, 核查文件个数, 看丢失哪些帧, 重新请求
if ( fileName . contains ( " .cfg " ) | | fileName . contains ( " .dat " ) ) {
if ( Objects . isNull ( fileInfoDto ) ) {
String fileCheck = redisUtil . getObjectByKey ( " fileCheck " + fileName ) . toString ( ) ;
if ( appFileMessage . getMsg ( ) . getFrameTotal ( ) = = 1 ) {
//解析文件入库
filePath = fileStream ( 1 , null , appFileMessage . getMsg ( ) . getData ( ) , fileName , appFileMessage . getId ( ) ) ;
filePath = fileStream ( 1 , null , appFileMessage . getMsg ( ) . getData ( ) , fileName , appFileMessage . getId ( ) , fileCheck , " download " );
} else {
//最后一帧
if ( Objects . equals ( appFileMessage . getMsg ( ) . getFrameTotal ( ) , appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ) {
Map < Integer , String > filePartMap = readFile ( lsFileName ) ;
filePartMap . put ( appFileMessage . getMsg ( ) . getFrameCurr ( ) , appFileMessage . getMsg ( ) . getData ( ) ) ;
//解析文件
filePath = fileStream ( appFileMessage . getMsg ( ) . getFrameTotal ( ) , filePartMap , null , fileName , appFileMessage . getId ( ) , fileCheck , " download " ) ;
//删除临时文件
File file = new File ( lsFileName ) ;
if ( file . exists ( ) ) {
file . delete ( ) ;
}
}
//中间帧
else {
Map < Integer , String > filePartMap = readFile ( lsFileName ) ;
if ( Objects . isNull ( filePartMap . get ( appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ) ) {
appendFile ( lsFileName , appFileMessage . getMsg ( ) . getFrameCurr ( ) , appFileMessage . getMsg ( ) . getData ( ) ) ;
}
}
}
if ( ! Objects . isNull ( filePath ) ) {
redisUtil . saveByKey ( " downloadFilePath: " + appFileMessage . getMsg ( ) . getName ( ) , filePath ) ;
}
}
//录波文件下载
//1.判断当前文件是否之前缓存过,没缓存,则先缓存(这边缓存两条记录,一条是用来判断超时的,还有一条记录文件数据,文件数据目前没有过期时间,文件数据收完才会删除)
//2.缓存了判断收到的报文个数是否和总个数一致,一致则解析文件;不一致则更新缓存
//3.超时判断: 30s未收到相关文件信息, 核查文件个数, 看丢失哪些帧, 重新请求
else {
if ( appFileMessage . getMsg ( ) . getFrameTotal ( ) = = 1 ) {
//解析文件入库
filePath = fileStream ( 1 , null , appFileMessage . getMsg ( ) . getData ( ) , fileName , appFileMessage . getId ( ) , fileInfoDto . getFileCheck ( ) , " event " ) ;
csEventLogs . setStatus ( 1 ) ;
csEventLogs . setRemark ( " 当前文件1帧,全部收到,解析成功! " ) ;
csEventLogs . setNowStep ( 1 ) ;
@@ -178,7 +211,7 @@ public class FileServiceImpl implements IFileService {
csEventLogs . setNowStep ( appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ;
csEventLogs . setAllStep ( appFileMessage . getMsg ( ) . getFrameTotal ( ) ) ;
csEventLogs . setIsAll ( 0 ) ;
redisUtil . saveByKeyWithExpire ( AppRedisKey . FILE_PART_TIME . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , null , 30 L) ;
redisUtil . saveByKeyWithExpire ( AppRedisKey . FILE_PART_TIME . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , null , 5 L) ;
redisUtil . saveByKey ( AppRedisKey . FILE_PART . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , fileStreamDto ) ;
//将数据写入临时文件
appendFile ( lsFileName , appFileMessage . getMsg ( ) . getFrameCurr ( ) , appFileMessage . getMsg ( ) . getData ( ) ) ;
@@ -192,7 +225,7 @@ public class FileServiceImpl implements IFileService {
Map < Integer , String > filePartMap = readFile ( lsFileName ) ;
filePartMap . put ( appFileMessage . getMsg ( ) . getFrameCurr ( ) , appFileMessage . getMsg ( ) . getData ( ) ) ;
//解析文件
filePath = fileStream ( appFileMessage . getMsg ( ) . getFrameTotal ( ) , filePartMap , null , fileName , appFileMessage . getId ( ) ) ;
filePath = fileStream ( appFileMessage . getMsg ( ) . getFrameTotal ( ) , filePartMap , null , fileName , appFileMessage . getId ( ) , fileInfoDto . getFileCheck ( ) , " event " );
csEventLogs . setStatus ( 1 ) ;
csEventLogs . setRemark ( " 当前文件 " + appFileMessage . getMsg ( ) . getFrameTotal ( ) + " 帧,这是第 " + appFileMessage . getMsg ( ) . getFrameCurr ( ) + " 帧,全部收到,解析成功! " ) ;
csEventLogs . setNowStep ( appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ;
@@ -224,17 +257,21 @@ public class FileServiceImpl implements IFileService {
csEventLogs . setNowStep ( appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ;
csEventLogs . setAllStep ( appFileMessage . getMsg ( ) . getFrameTotal ( ) ) ;
csEventLogs . setIsAll ( 0 ) ;
redisUtil . saveByKey ( AppRedisKey . FILE_PART . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , dto ) ;
long time1 = System . currentTimeMillis ( ) ;
//将数据写入临时文件
appendFile ( lsFileName , appFileMessage . getMsg ( ) . getFrameCurr ( ) , appFileMessage . getMsg ( ) . getData ( ) ) ;
long time2 = System . currentTimeMillis ( ) ;
System . out . println ( " time==: " + ( time2 - time1 ) ) ;
log . info ( " 当前文件 {} 帧,这是第 {} 帧报文,记录成功 " , appFileMessage . getMsg ( ) . getFrameTotal ( ) , appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ;
if ( Objects . isNull ( object2 ) ) {
redisUtil . saveByKeyWithExpire ( AppRedisKey . FILE_PART_TIME . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , null , 3 0L) ;
redisUtil . saveByKeyWithExpire ( AppRedisKey . FILE_PART_TIME . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , null , 1 0L) ;
} else {
redisUtil . saveByKeyWithExpire ( AppRedisKey . FILE_PART_TIME . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , null , 1L ) ;
}
redisUtil . saveByKey ( AppRedisKey . FILE_PART . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , dto ) ;
//将数据写入临时文件
appendFile ( lsFileName , appFileMessage . getMsg ( ) . getFrameCurr ( ) , appFileMessage . getMsg ( ) . getData ( ) ) ;
log . info ( " 当前文件 {} 帧,这是第 {} 帧报文,记录成功 " , appFileMessage . getMsg ( ) . getFrameTotal ( ) , appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ;
}
} else {
redisUtil . saveByKeyWithExpire ( AppRedisKey . FILE_PART_TIME . concat ( appFileMessage . getMsg ( ) . getName ( ) ) , null , 1L ) ;
csEventLogs . setStatus ( 1 ) ;
csEventLogs . setRemark ( " 当前文件为重复帧,这是第 " + appFileMessage . getMsg ( ) . getFrameCurr ( ) + " 帧,不做记录! " ) ;
csEventLogs . setNowStep ( appFileMessage . getMsg ( ) . getFrameCurr ( ) ) ;
@@ -249,9 +286,6 @@ public class FileServiceImpl implements IFileService {
csEventLogs . setLocation ( fileInfoDto . getLocation ( ) ) ;
//记录日志
csEventLogsService . save ( csEventLogs ) ;
} else {
//todo 处理其他文件
log . info ( " 暂未做其他文件处理 " ) ;
}
} catch ( Exception e ) {
csEventLogs . setStatus ( 0 ) ;
@@ -283,10 +317,10 @@ public class FileServiceImpl implements IFileService {
/**
* 组装文件
*/
public String fileStream ( Integer number , Map < Integer , String > map , String data , String fileName , String nDid ) {
public String fileStream ( Integer number , Map < Integer , String > map , String data , String fileName , String nDid , String fileCheck , String type ) {
String filePath ;
if ( number = = 1 ) {
filePath = stream ( true , data , nDid , fileName , null ) ;
filePath = stream ( true , data , nDid , fileName , null , fileCheck , type );
} else {
int lengthByte = 0 ;
for ( int i = 1 ; i < = number ; i + + ) {
@@ -300,7 +334,7 @@ public class FileServiceImpl implements IFileService {
System . arraycopy ( byteArray , 0 , allByte , countLength , byteArray . length ) ;
countLength + = byteArray . length ;
}
filePath = stream ( false , null , nDid , fileName , allByte ) ;
filePath = stream ( false , null , nDid , fileName , allByte , fileCheck , type );
}
return filePath ;
}
@@ -308,7 +342,8 @@ public class FileServiceImpl implements IFileService {
/**
* 解析存储文件信息
*/
public String stream ( boolean bool , String stream , String folder , String fileName , byte [ ] bytes ) {
public String stream ( boolean bool , String stream , String folder , String fileName , byte [ ] bytes , String fileCheck , String type ) {
String path ;
byte [ ] byteArray = null ;
//将文件后缀替换成大写
String [ ] parts = fileName . split ( StrUtil . SLASH ) ;
@@ -321,9 +356,18 @@ public class FileServiceImpl implements IFileService {
} else {
byteArray = bytes ;
}
//todo 此处需要做文件crc校验或者md5校验, 目前不知道怎么处理, 先放一下
//文件校验
int crc = CRC32Utils . calculateCRC32 ( byteArray , byteArray . length , 0xffffffff ) ;
String hexString = String . format ( " %08X " , crc ) ;
if ( ! Objects . equals ( hexString , fileCheck ) ) {
throw new BusinessException ( AccessResponseEnum . FILE_CHECK_ERROR ) ;
}
InputStream inputStream = new ByteArrayInputStream ( byteArray ) ;
String path = fileStorageUtil . uploadStreamSpecifyName ( inputStream , OssPath . WAVE_DIR + folder + StrUtil . SLASH , fileName ) ;
if ( Objects . equals ( type , " download " ) ) {
path = fileStorageUtil . uploadStreamSpecifyName ( inputStream , OssPath . DOWNLOAD_DIR + folder + StrUtil . SLASH , fileName ) ;
} else {
path = fileStorageUtil . uploadStreamSpecifyName ( inputStream , OssPath . WAVE_DIR + folder + StrUtil . SLASH , fileName ) ;
}
try {
inputStream . close ( ) ;
} catch ( IOException e ) {