@@ -18,31 +18,37 @@ import static com.njcn.gather.tools.comtrade.comparewave.core.Constants.MAX_CH_N
* <p>对应C代码: main_pro.c中的文件读取部分</p>
* <p><b>重要: 必须严格按照C代码的读取逻辑</b></p>
* <p>用于读取IEEE C37.111标准的COMTRADE格式文件, 包括CFG配置文件和DAT数据文件</p>
*
*
* @author hongawen
* @since 1.0
*/
@Slf4j
public class ComtradeReader {
/** ASCII格式文件类型常量 */
/**
* ASCII格式文件类型常量
*/
private static final String FILE_TYPE_ASCII = " ASCII " ;
/** 二进制格式文件类型常量 */
/**
* 二进制格式文件类型常量
*/
private static final String FILE_TYPE_BINARY = " BINARY " ;
/**
* 读取COMTRADE文件( 使用InputStream)
* <p>从输入流中读取CFG配置文件和DAT数据文件</p>
*
* @param cfgStream CFG配置文件输入流
* @param dat Stream DAT数据 文件输入流
* @param dataBuf 数据缓冲区
* @param encoding 文件编码格式
* <p>重要: 实现了与C代码一致的降采样算法, 确保采样率统一</p>
*
* @param cfg Stream CFG配置 文件输入流
* @param datStream DAT数据文件输入流
* @param dataBuf 数据缓冲区
* @param encoding 文件编码格式
* @param targetSampleRate 目标采样率(点/周波) , 0表示自动选择
* @return 读取是否成功
*/
public static boolean readSampleFile ( InputStream cfgStream , InputStream datStream ,
DataPq dataBuf , String encoding ) {
DataPq dataBuf , String encoding , float targetSampleRate ) {
try {
// 从输入流读取CFG配置文件
ComtradeData . CfgInfo cfgInfo = readCfgFile ( cfgStream , encoding ) ;
@@ -50,7 +56,7 @@ public class ComtradeReader {
log . error ( " Failed to read CFG from stream " ) ;
return false ;
}
// 获取数据文件类型( ASCII或BINARY)
String dataFileType = cfgInfo . getDataFileType ( ) ;
if ( dataFileType = = null | | dataFileType . isEmpty ( ) ) {
@@ -58,101 +64,92 @@ public class ComtradeReader {
dataFileType = FILE_TYPE_BINARY ;
log . info ( " 数据文件类型未指定, 默认使用BINARY格式 " ) ;
}
// 从输入流读取DAT数据文件
boolean success = readDatFileFromStream ( datStream , cfgInfo , dataBuf , dataFileTyp e) ;
// 计算有效的目标采样率( 对应C代码中的out_smp计算逻辑)
float effectiveTargetRate = calculateEffectiveTargetSampleRate ( cfgInfo , targetSampleRat e) ;
// 从输入流读取DAT数据文件, 包含降采样处理
boolean success = readDatFileFromStream ( datStream , cfgInfo , dataBuf , dataFileType , effectiveTargetRate ) ;
if ( ! success ) {
log . error ( " Failed to read DAT from stream " ) ;
return false ;
}
// 处理CFG信息和数据缓冲区
return processCfgAndData ( cfgInfo , dataBuf ) ;
return processCfgAndData ( cfgInfo , dataBuf , effectiveTargetRate );
} catch ( Exception e ) {
log . error ( " Error reading COMTRADE from streams " , e ) ;
return false ;
}
}
/**
* 处理CFG信息和数据缓冲区
* <p>提取公共处理逻辑,设置采样参数和数据缓冲区</p>
* <p>注意: 此时还没有设置接线方式, 需要在外部调用applyConfiguration后才能确定</p>
*
* @param cfgInfo CFG配置信息
* @param dataBuf 数据缓冲区
*
* @param cfgInfo CFG配置信息
* @param dataBuf 数据缓冲区
* @param effectiveTargetRate 有效的目标采样率(仅用于日志输出)
* @return 处理是否成功
*/
private static boolean processCfgAndData ( ComtradeData . CfgInfo cfgInfo , DataPq dataBuf ) {
private static boolean processCfgAndData ( ComtradeData . CfgInfo cfgInfo , DataPq dataBuf , float effectiveTargetRate ) {
try {
// 计算每周波采样点数( 对应C代码中smp_rate )
float lineFreq = ( float ) cfgInfo . getLineFreq ( ) ;
// 采样率已在DAT文件读取时设置( 考虑了多段统一降采样 )
// 不再覆盖, 仅用effectiveTargetRate作为日志参考
float lineFreq = ( float ) cfgInfo . getLineFreq ( ) ;
if ( lineFreq < = 0 ) {
lineFreq = 50 . 0f ;
}
float samplesPerCycle = ( float ) cfgInfo . getSampleRate ( ) / lineFreq ;
dataBuf . setFactSmpRate ( samplesPerCycle ) ;
// 验证采样率限制要求
if ( samplesPerCycle < 128 ) {
log . error ( " 采样率过低:每周波采样点数={}, 最小要求128 " , samplesPerCycle ) ;
throw new IllegalArgumentException ( " 采样率过低,每周波采样点数必须>=128, 当前值: " + samplesPerCycle ) ;
}
// 限制最大采样率
if ( samplesPerCycle > 256 ) {
log . warn ( " 采样率过高:每周波采样点数={}, 限制为256 " , samplesPerCycle ) ;
samplesPerCycle = 256 ;
}
dataBuf . setSmpRate ( samplesPerCycle ) ;
dataBuf . setF ( lineFreq ) ;
// 设置录波开始时间以支持波形对齐
dataBuf . setLbStartTime ( cfgInfo . getStartTime ( ) ) ;
// 额定电压电流由外部参数传入,提供更灵活的配置
// 输出数据读取统计信息
log . info ( " 数据读取完成 - 采样点数: {}, 每周波采样点数: {}, 频率: {} Hz " ,
dataBuf . getSmpNum ( ) , dataBuf . getSmpRate ( ) , dataBuf . getF ( ) ) ;
log . info ( " 数据读取完成 - 采样点数: {}, 每周波采样点数: {}, 频率: {} Hz " ,
dataBuf . getSmpNum ( ) , dataBuf . getSmpRate ( ) , dataBuf . getF ( ) ) ;
// 输出部分采样点用于数据验证
if ( dataBuf . getSmpNum ( ) > 0 ) {
log . debug ( " 前10个UA采样点: {}, {}, {}, {}, {}, {}, {}, {}, {}, {} " ,
dataBuf . getSmpData ( ) [ 0 ] [ 0 ] , dataBuf . getSmpData ( ) [ 0 ] [ 1 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 2 ] , dataBuf . getSmpData ( ) [ 0 ] [ 3 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 4 ] , dataBuf . getSmpData ( ) [ 0 ] [ 5 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 6 ] , dataBuf . getSmpData ( ) [ 0 ] [ 7 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 8 ] , dataBuf . getSmpData ( ) [ 0 ] [ 9 ] ) ;
log . debug ( " 前10个UA采样点: {}, {}, {}, {}, {}, {}, {}, {}, {}, {} " ,
dataBuf . getSmpData ( ) [ 0 ] [ 0 ] , dataBuf . getSmpData ( ) [ 0 ] [ 1 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 2 ] , dataBuf . getSmpData ( ) [ 0 ] [ 3 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 4 ] , dataBuf . getSmpData ( ) [ 0 ] [ 5 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 6 ] , dataBuf . getSmpData ( ) [ 0 ] [ 7 ] ,
dataBuf . getSmpData ( ) [ 0 ] [ 8 ] , dataBuf . getSmpData ( ) [ 0 ] [ 9 ] ) ;
}
return true ;
} catch ( Exception e ) {
log . error ( " Error processing COMTRADE data " , e ) ;
return false ;
}
}
/**
* 读取CFG配置文件( 从InputStream)
* <p>解析COMTRADE配置文件, 提取通道信息和采样参数</p>
*
*
* @param inputStream CFG文件输入流
* @param encoding 文件编码格式
* @param encoding 文件编码格式
* @return CFG配置信息对象, 解析失败返回null
*/
private static ComtradeData . CfgInfo readCfgFile ( InputStream inputStream , String encoding ) {
ComtradeData . CfgInfo cfgInfo = new ComtradeData . CfgInfo ( ) ;
try ( BufferedReader reader = new BufferedReader (
new InputStreamReader ( inputStream , Charset . forName ( encoding ) ) ) ) {
String line ;
int lineNum = 0 ;
// 第1行: 站名, 装置ID, 版本年份
line = reader . readLine ( ) ;
lineNum + + ;
@@ -170,7 +167,7 @@ public class ComtradeReader {
}
}
}
// 第2行: 通道总数, 模拟通道数A, 数字通道数D
line = reader . readLine ( ) ;
lineNum + + ;
@@ -183,7 +180,7 @@ public class ComtradeReader {
analogPart = analogPart . substring ( 0 , analogPart . length ( ) - 1 ) ;
}
cfgInfo . setAnalogChannels ( Integer . parseInt ( analogPart ) ) ;
String digitalPart = parts [ 2 ] . trim ( ) ;
if ( digitalPart . endsWith ( " D " ) ) {
digitalPart = digitalPart . substring ( 0 , digitalPart . length ( ) - 1 ) ;
@@ -191,11 +188,11 @@ public class ComtradeReader {
cfgInfo . setDigitalChannels ( Integer . parseInt ( digitalPart ) ) ;
}
}
// 读取模拟通道信息
int analogCount = cfgInfo . getAnalogChannels ( ) ;
ComtradeData . ChannelInfo [ ] analogChannels = new ComtradeData . ChannelInfo [ analogCount ] ;
for ( int i = 0 ; i < analogCount ; i + + ) {
line = reader . readLine ( ) ;
lineNum + + ;
@@ -204,60 +201,75 @@ public class ComtradeReader {
}
}
cfgInfo . setAnalogChannelInfos ( analogChannels ) ;
// 跳过数字通道信息
int digitalCount = cfgInfo . getDigitalChannels ( ) ;
for ( int i = 0 ; i < digitalCount ; i + + ) {
reader . readLine ( ) ;
lineNum + + ;
}
// 读取电网频率
line = reader . readLine ( ) ;
lineNum + + ;
if ( line ! = null ) {
cfgInfo . setLineFreq ( Double . parseDouble ( line . trim ( ) ) ) ;
}
// 读取采样率信息
// 读取采样率信息(段数)
line = reader . readLine ( ) ;
lineNum + + ;
if ( line ! = null ) {
cfgInfo . setNrates ( Integer . parseInt ( line . trim ( ) ) ) ;
}
// 读取采样率和采样点数
line = reader . readLine ( ) ;
lineNum + + ;
if ( line ! = null ) {
String [ ] part s = line . split ( " , " ) ;
if ( parts . length > = 2 ) {
cfgInfo . setSampleRate ( Double . parseDouble ( parts [ 0 ] . trim ( ) ) ) ;
cfgInfo . setTotalSamples ( Integer . parseInt ( parts [ 1 ] . trim ( ) ) ) ;
// 读取每段的 采样率和采样点数( 对应C代码的rate[]和fd_data_num[])
int nrates = cfgInfo . getNrates ( ) ;
if ( nrates < = 0 ) nrates = 1 ; // 默认至少1段
double [ ] sampleRate s = new double [ nrates ] ;
int [ ] segmentSamples = new int [ nrates ] ;
int totalSamples = 0 ;
for ( int i = 0 ; i < nrates ; i + + ) {
line = reader . readLine ( ) ;
lineNum + + ;
if ( line ! = null ) {
String [ ] parts = line . split ( " , " ) ;
if ( parts . length > = 2 ) {
sampleRates [ i ] = Double . parseDouble ( parts [ 0 ] . trim ( ) ) ;
segmentSamples [ i ] = Integer . parseInt ( parts [ 1 ] . trim ( ) ) ;
totalSamples + = segmentSamples [ i ] ;
}
}
}
cfgInfo . setSampleRates ( sampleRates ) ;
cfgInfo . setSegmentSamples ( segmentSamples ) ;
cfgInfo . setSampleRate ( sampleRates [ 0 ] ) ; // 兼容性:主采样率设为第一段
cfgInfo . setTotalSamples ( totalSamples ) ;
// 读取开始时间
line = reader . readLine ( ) ;
lineNum + + ;
if ( line ! = null ) {
cfgInfo . setStartTime ( parseDateTime ( line ) ) ;
}
// 读取触发时间
line = reader . readLine ( ) ;
lineNum + + ;
if ( line ! = null ) {
cfgInfo . setTriggerTime ( parseDateTime ( line ) ) ;
}
// 读取数据文件类型
line = reader . readLine ( ) ;
lineNum + + ;
if ( line ! = null ) {
cfgInfo . setDataFileType ( line . trim ( ) . toUpperCase ( ) ) ;
}
// 读取时间倍率
line = reader . readLine ( ) ;
lineNum + + ;
@@ -268,23 +280,23 @@ public class ComtradeReader {
cfgInfo . setTimeMult ( 1 . 0 ) ;
}
}
return cfgInfo ;
} catch ( IOException e ) {
log . error ( " Error reading CFG from stream " , e ) ;
return null ;
}
}
/**
* 解析模拟通道信息
*/
private static ComtradeData . ChannelInfo parseAnalogChannel ( String line ) {
ComtradeData . ChannelInfo channel = new ComtradeData . ChannelInfo ( ) ;
String [ ] parts = line . split ( " , " ) ;
if ( parts . length > = 13 ) {
channel . setChannelNumber ( Integer . parseInt ( parts [ 0 ] . trim ( ) ) ) ;
channel . setChannelName ( parts [ 1 ] . trim ( ) ) ;
@@ -299,16 +311,16 @@ public class ComtradeReader {
channel . setSecondary ( Double . parseDouble ( parts [ 11 ] . trim ( ) ) ) ;
channel . setPs ( parts [ 12 ] . trim ( ) . charAt ( 0 ) ) ;
}
return channel ;
}
/**
* 解析日期时间
*/
private static ClockStruct parseDateTime ( String dateTimeStr ) {
ClockStruct clock = new ClockStruct ( ) ;
// 格式: dd/mm/yyyy,hh:mm:ss.ssssss
String [ ] parts = dateTimeStr . split ( " , " ) ;
if ( parts . length > = 2 ) {
@@ -319,13 +331,13 @@ public class ComtradeReader {
clock . setMonth ( Integer . parseInt ( dateParts [ 1 ] ) ) ;
clock . setYear ( Integer . parseInt ( dateParts [ 2 ] ) ) ;
}
// 解析时间
String [ ] timeParts = parts [ 1 ] . trim ( ) . split ( " : " ) ;
if ( timeParts . length > = 3 ) {
clock . setHour ( Integer . parseInt ( timeParts [ 0 ] ) ) ;
clock . setMinute ( Integer . parseInt ( timeParts [ 1 ] ) ) ;
// 解析秒和微秒
String [ ] secParts = timeParts [ 2 ] . split ( " \\ . " ) ;
clock . setSecond ( Integer . parseInt ( secParts [ 0 ] ) ) ;
@@ -339,124 +351,435 @@ public class ComtradeReader {
}
}
}
return clock ;
}
/**
* 读取DAT数据文件( 从InputStream)
*/
private static boolean readDatFileFromStream ( InputStream datStream , ComtradeData . CfgInfo cfgInfo ,
DataPq dataBuf , String dataFileType ) {
private static boolean readDatFileFromStream ( InputStream datStream , ComtradeData . CfgInfo cfgInfo ,
DataPq dataBuf , String dataFileType , float targetSampleRate ) {
try {
if ( FILE_TYPE_ASCII . equals ( dataFileType ) ) {
return readAsciiDatFileFromStream ( datStream , cfgInfo , dataBuf ) ;
return readAsciiDatFileFromStream ( datStream , cfgInfo , dataBuf , targetSampleRate );
} else {
return readBinaryDatFileFromStream ( datStream , cfgInfo , dataBuf ) ;
return readBinaryDatFileFromStream ( datStream , cfgInfo , dataBuf , targetSampleRate );
}
} catch ( Exception e ) {
log . error ( " Error reading DAT from stream " , e ) ;
return false ;
}
}
/**
* 找出多段数据中的最低采样率
* 对应C代码: 寻找所有段中最低的采样率
*/
private static float findMinimumSampleRate ( ComtradeData . CfgInfo cfgInfo , float targetSampleRate ) {
float lineFreq = ( float ) cfgInfo . getLineFreq ( ) ;
if ( lineFreq < = 0 ) {
lineFreq = 50 . 0f ;
}
float minSamplesPerCycle = Float . MAX_VALUE ;
// 如果有多段数据
if ( cfgInfo . getNrates ( ) > 0 & & cfgInfo . getSampleRates ( ) ! = null ) {
for ( int i = 0 ; i < cfgInfo . getNrates ( ) ; i + + ) {
float samplesPerCycle = ( float ) ( cfgInfo . getSampleRates ( ) [ i ] / lineFreq ) ;
if ( samplesPerCycle < minSamplesPerCycle ) {
minSamplesPerCycle = samplesPerCycle ;
}
}
} else {
// 单段数据,使用主采样率
minSamplesPerCycle = ( float ) ( cfgInfo . getSampleRate ( ) / lineFreq ) ;
}
// 如果最低采样率仍然高于目标256, 则返回目标值
// 否则返回实际最低值
return Math . min ( minSamplesPerCycle , targetSampleRate ) ;
}
/**
* 读取ASCII格式的DAT文件( 从InputStream)
*/
private static boolean readAsciiDatFileFromStream ( InputStream datStream , ComtradeData . CfgInfo cfgInfo , DataPq dataBuf )
throws IOException {
private static boolean readAsciiDatFileFromStream ( InputStream datStream , ComtradeData . CfgInfo cfgInfo ,
DataPq dataBuf , float targetSampleRate ) throws IOException {
try ( BufferedReader reader = new BufferedReader ( new InputStreamReader ( datStream ) ) ) {
String line ;
// 输出索引
int sampleIndex = 0 ;
while ( ( line = reader . readLine ( ) ) ! = null & & sample Index < cfgInfo . getTotalSamples ( ) ) {
String [ ] values = line . split ( " , " ) ;
// 跳过序号和时间戳(前两列)
int dataStartIndex = 2 ;
// 读取模拟通道数据
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
if ( dataStartIndex + ch < values . length ) {
int rawValue = Integer . parseInt ( values [ dataStartIndex + ch ] . trim ( ) ) ;
// 直接存储原始值, 像C代码一样
dataBuf . getSmpData ( ) [ ch ] [ sampleIndex ] = rawValue ;
// 输入索引
int input Index = 0 ;
// 找出最低采样率作为统一目标
float unifiedSampleRate = findMinimumSampleRate ( cfgInfo , targetSampleRate ) ;
float lineFreq = ( float ) cfgInfo . getLineFreq ( ) ;
if ( lineFreq < = 0 ) {
lineFreq = 50 . 0f ;
}
// 处理多段数据
if ( cfgInfo . getNrates ( ) > 0 & & cfgInfo . getSampleRates ( ) ! = null ) {
// 多段数据处理
for ( int seg = 0 ; seg < cfgInfo . getNrates ( ) ; seg + + ) {
float segmentSamplesPerCycle = ( float ) ( cfgInfo . getSampleRates ( ) [ seg ] / lineFreq ) ;
int segmentSamples = cfgInfo . getSegmentSamples ( ) [ seg ] ;
// 计算当前段的降采样模数
int downsampleMod = ( segmentSamplesPerCycle > unifiedSampleRate ) ?
Math . round ( segmentSamplesPerCycle / unifiedSampleRate ) : 1 ;
log . info ( " 段{} - 原始: {}点/周波, 统一目标: {}点/周波, 模数: {} " ,
seg , segmentSamplesPerCycle , unifiedSampleRate , downsampleMod ) ;
// 读取当前段的数据
int segmentInputIndex = 0 ;
while ( segmentInputIndex < segmentSamples & & ( line = reader . readLine ( ) ) ! = null ) {
// 降采样: 每隔downsampleMod个点取一个
if ( segmentInputIndex % downsampleMod = = 0 ) {
String [ ] values = line . split ( " , " ) ;
// 跳过序号和时间戳
int dataStartIndex = 2 ;
// 读取模拟通道数据
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
if ( dataStartIndex + ch < values . length ) {
int rawValue = Integer . parseInt ( values [ dataStartIndex + ch ] . trim ( ) ) ;
dataBuf . getSmpData ( ) [ ch ] [ sampleIndex ] = rawValue ;
}
}
sampleIndex + + ;
}
segmentInputIndex + + ;
inputIndex + + ;
}
}
sampleIndex + + ;
} else {
// 单段数据处理(保持原有逻辑)
float originalSamplesPerCycle = ( float ) cfgInfo . getSampleRate ( ) / lineFreq ;
int downsampleMod = ( originalSamplesPerCycle > targetSampleRate ) ?
Math . round ( originalSamplesPerCycle / targetSampleRate ) : 1 ;
log . info ( " ASCII采样处理 - 原始: {}点/周波, 目标: {}点/周波, 模数: {} " ,
originalSamplesPerCycle , targetSampleRate , downsampleMod ) ;
while ( ( line = reader . readLine ( ) ) ! = null & & inputIndex < cfgInfo . getTotalSamples ( ) ) {
// 降采样: 每隔downsampleMod个点取一个( 对应C代码: i % mod == 0)
if ( inputIndex % downsampleMod = = 0 ) {
String [ ] values = line . split ( " , " ) ;
// 跳过序号和时间戳(前两列)
int dataStartIndex = 2 ;
// 读取模拟通道数据
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
if ( dataStartIndex + ch < values . length ) {
int rawValue = Integer . parseInt ( values [ dataStartIndex + ch ] . trim ( ) ) ;
// 直接存储原始值, 像C代码一样
dataBuf . getSmpData ( ) [ ch ] [ sampleIndex ] = rawValue ;
}
}
sampleIndex + + ;
}
inputIndex + + ;
}
}
dataBuf . setSmpNum ( sampleIndex ) ;
// 设置采样率为统一的降采样后的值
dataBuf . setSmpRate ( unifiedSampleRate ) ;
// 设置增益系数( 从CFG文件的a系数获取)
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
ComtradeData . ChannelInfo chInfo = cfgInfo . getAnalogChannelInfos ( ) [ ch ] ;
dataBuf . getUiGainXs ( ) [ ch ] = ( float ) chInfo . getA ( ) ; // 使用CFG文件中的a系数作为增益
dataBuf . getUiGainXs ( ) [ ch ] = ( float ) chInfo . getA ( ) ; // 使用CFG文件中的a系数作为增益
}
log . info ( " ASCII DAT文件读取完成 - 读取了 {} 个采样点 " , sampleIndex ) ;
log . info ( " ASCII DAT文件读取完成 - 原始点数: {}, 实际读取点数: {}, 统一采样率: {}点/周波 " ,
inputIndex , sampleIndex , unifiedSampleRate ) ;
return true ;
}
}
/**
* 读取二进制格式的DAT文件( 从InputStream)
*/
private static boolean readBinaryDatFileFromStream ( InputStream datStream , ComtradeData . CfgInfo cfgInfo , DataPq dataBuf )
throws IOException {
// 计算每个采样记录的字节数
int analogBytes = cfgInfo . getAnalogChannels ( ) * 2 ; // 每个模拟通道2字节
int digitalBytes = ( cfgInfo . getDigitalChannels ( ) + 15 ) / 16 * 2 ; // 数字通道按16位对齐
int recordSize = 4 + 4 + analogBytes + digitalBytes ; // 序号(4) + 时间戳(4) + 模拟 + 数字
private static boolean readBinaryDatFileFromStream ( InputStream datStream , ComtradeData . CfgInfo cfgInfo ,
DataPq dataBuf , float targetSampleRate ) throws IOException {
// 计算每个采样记录的字节数// 每个模拟通道2字节
int analogBytes = cfgInfo . getAnalogChannels ( ) * 2 ;
// 数字通道按16位对齐
int digitalBytes = ( cfgInfo . getDigitalChannels ( ) + 15 ) / 16 * 2 ;
// 序号(4) + 时间戳(4) + 模拟 + 数字
int recordSize = 4 + 4 + analogBytes + digitalBytes ;
// 找出最低采样率作为统一目标
float unifiedSampleRate = findMinimumSampleRate ( cfgInfo , targetSampleRate ) ;
float lineFreq = ( float ) cfgInfo . getLineFreq ( ) ;
if ( lineFreq < = 0 ) {
lineFreq = 50 . 0f ;
}
byte [ ] buffer = new byte [ recordSize ] ;
int sampleIndex = 0 ;
int sampleIndex = 0 ; // 输出索引
int inputIndex = 0 ; // 输入索引
try ( DataInputStream dis = new DataInputStream ( new BufferedInputStream ( datStream ) ) ) {
while ( sampleIndex < cfgInfo . getTotalSamples ( ) ) {
int bytesRead = dis . read ( buffer , 0 , recordSize ) ;
if ( bytesRead ! = recordSize ) {
break ; // 读取完毕或出错
// 处理多段数据
if ( cfgInfo . getNrates ( ) > 0 & & cfgInfo . getSampleRates ( ) ! = null ) {
// 多段数据处理
for ( int seg = 0 ; seg < cfgInfo . getNrates ( ) ; seg + + ) {
float segmentSamplesPerCycle = ( float ) ( cfgInfo . getSampleRates ( ) [ seg ] / lineFreq ) ;
int segmentSamples = cfgInfo . getSegmentSamples ( ) [ seg ] ;
// 计算当前段的降采样模数
int downsampleMod = ( segmentSamplesPerCycle > unifiedSampleRate ) ?
Math . round ( segmentSamplesPerCycle / unifiedSampleRate ) : 1 ;
log . info ( " 二进制段{} - 原始: {}点/周波, 统一目标: {}点/周波, 模数: {} " ,
seg , segmentSamplesPerCycle , unifiedSampleRate , downsampleMod ) ;
// 读取当前段的数据
int segmentInputIndex = 0 ;
while ( segmentInputIndex < segmentSamples ) {
int bytesRead = dis . read ( buffer , 0 , recordSize ) ;
if ( bytesRead ! = recordSize ) {
break ; // 读取完毕或出错
}
// 降采样: 每隔downsampleMod个点取一个
if ( segmentInputIndex % downsampleMod = = 0 ) {
ByteBuffer bb = ByteBuffer . wrap ( buffer ) ;
bb . order ( ByteOrder . LITTLE_ENDIAN ) ; // COMTRADE标准使用小端序
// 跳过序号和时间戳
bb . getInt ( ) ; // 序号
bb . getInt ( ) ; // 时间戳
// 读取模拟通道数据
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
short rawValue = bb . getShort ( ) ;
dataBuf . getSmpData ( ) [ ch ] [ sampleIndex ] = rawValue ;
}
sampleIndex + + ;
}
segmentInputIndex + + ;
inputIndex + + ;
}
}
ByteBuffer bb = ByteBuffer . wrap ( buffer ) ;
bb . order ( ByteOrder . LITTLE_ENDIAN ) ; // COMTRADE标准使用小端序
// 跳过序号和时间戳
bb . getInt ( ) ; // 序号
bb . getInt ( ) ; // 时间戳
// 读取模拟通道数据
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
shor t rawValue = bb . getShort ( ) ;
// 直接存储原始值, 像C代码一样
dataBuf . getSmpData ( ) [ ch ] [ sampleIndex ] = rawValue ;
} else {
// 单段数据处理(保持原有逻辑)
float originalSamplesPerCycle = ( float ) cfgInfo . getSampleRate ( ) / lineFreq ;
int downsampleMod = ( originalSamplesPerCycle > targetSampleRate ) ?
Math . round ( originalSamplesPerCycle / targetSampleRate ) : 1 ;
log . info ( " 二进制采样处理 - 原始: {}点/周波, 目标: {}点/周波, 模数: {} " ,
originalSamplesPerCycle , targetSampleRate , downsampleMod ) ;
while ( inputIndex < cfgInfo . getTotalSamples ( ) ) {
in t bytesRead = dis . read ( buffer , 0 , recordSize ) ;
if ( bytesRead ! = recordSize ) {
break ; // 读取完毕或出错
}
// 降采样: 每隔downsampleMod个点取一个( 对应C代码: i % mod == 0)
if ( inputIndex % downsampleMod = = 0 ) {
ByteBuffer bb = ByteBuffer . wrap ( buffer ) ;
bb . order ( ByteOrder . LITTLE_ENDIAN ) ; // COMTRADE标准使用小端序
// 跳过序号和时间戳
bb . getInt ( ) ; // 序号
bb . getInt ( ) ; // 时间戳
// 读取模拟通道数据
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
short rawValue = bb . getShort ( ) ;
// 直接存储原始值, 像C代码一样
dataBuf . getSmpData ( ) [ ch ] [ sampleIndex ] = rawValue ;
}
sampleIndex + + ;
}
inputIndex + + ;
}
sampleIndex + + ;
}
}
dataBuf . setSmpNum ( sampleIndex ) ;
// 设置采样率为统一的降采样后的值
dataBuf . setSmpRate ( unifiedSampleRate ) ;
// 设置增益系数( 从CFG文件的a系数获取)
for ( int ch = 0 ; ch < cfgInfo . getAnalogChannels ( ) & & ch < MAX_CH_NUM ; ch + + ) {
ComtradeData . ChannelInfo chInfo = cfgInfo . getAnalogChannelInfos ( ) [ ch ] ;
dataBuf . getUiGainXs ( ) [ ch ] = ( float ) chInfo . getA ( ) ; // 使用CFG文件中的a系数作为增益
dataBuf . getUiGainXs ( ) [ ch ] = ( float ) chInfo . getA ( ) ; // 使用CFG文件中的a系数作为增益
}
log . info ( " 二进制DAT文件读取完成 - 读取了 {} 个采样点 " , sampleIndex ) ;
log . info ( " 二进制DAT文件读取完成 - 原始点数: {}, 实际读取点数: {}, 统一采样率: {}点/周波 " ,
inputIndex , sampleIndex , unifiedSampleRate ) ;
return true ;
}
/**
* 计算有效的目标采样率( 对应C代码中的out_smp逻辑)
* <p>按照C代码第345-346行: 限制最大采样率为256点/周波</p>
*
* @param cfgInfo CFG配置信息
* @param requestedTargetRate 请求的目标采样率, 0表示自动选择
* @return 有效的目标采样率
*/
private static float calculateEffectiveTargetSampleRate ( ComtradeData . CfgInfo cfgInfo , float requestedTargetRate ) {
// 计算原始每周波采样点数
float lineFreq = ( float ) cfgInfo . getLineFreq ( ) ;
if ( lineFreq < = 0 ) {
lineFreq = 50 . 0f ;
}
float samplesPerCycle = ( float ) cfgInfo . getSampleRate ( ) / lineFreq ;
// 验证最小采样率要求( 对应C代码第333-335行)
if ( samplesPerCycle < 128 ) {
log . error ( " 采样率过低:每周波{}点, 最小要求128点 " , samplesPerCycle ) ;
throw new IllegalArgumentException ( " 采样率过低,每周波采样点数必须>=128, 当前: " + samplesPerCycle ) ;
}
// 使用指定目标采样率,否则使用原始采样率
float targetRate = ( requestedTargetRate > 0 ) ? requestedTargetRate : samplesPerCycle ;
// 对应C代码第345-346行: 限制最大采样率为256点/周波
if ( targetRate > 256 ) {
log . info ( " 采样率{}超过256, 限制为256点/周波 " , targetRate ) ;
targetRate = 256 ;
}
// 目标采样率不能高于原始采样率(不能上采样)
if ( targetRate > samplesPerCycle ) {
targetRate = samplesPerCycle ;
}
log . info ( " 采样率确定 - 原始: {}点/周波, 目标: {}点/周波 " , samplesPerCycle , targetRate ) ;
return targetRate ;
}
/**
* 兼容性方法:不指定目标采样率的读取接口
*/
public static boolean readSampleFile ( InputStream cfgStream , InputStream datStream ,
DataPq dataBuf , String encoding ) {
return readSampleFile ( cfgStream , datStream , dataBuf , encoding , 0 ) ;
}
/**
* 读取两个COMTRADE文件并统一采样率( 对应C代码的两文件比较逻辑)
* <p>实现了与C代码一致的双文件采样率统一策略: </p>
* <ol>
* <li>先读取两个CFG文件</li>
* <li>计算两者的最小采样率( 但不超过256点/周波)</li>
* <li>将两个文件都降采样到统一的采样率</li>
* </ol>
*
* @param cfg1Stream 文件1的CFG流
* @param dat1Stream 文件1的DAT流
* @param dataBuf1 文件1的数据缓冲区
* @param cfg2Stream 文件2的CFG流
* @param dat2Stream 文件2的DAT流
* @param dataBuf2 文件2的数据缓冲区
* @param encoding 文件编码
* @return 读取是否成功
*/
public static boolean readTwoFilesWithUnifiedSampleRate (
InputStream cfg1Stream , InputStream dat1Stream , DataPq dataBuf1 ,
InputStream cfg2Stream , InputStream dat2Stream , DataPq dataBuf2 ,
String encoding ) {
try {
// 步骤1: 读取两个CFG文件
ComtradeData . CfgInfo cfg1 = readCfgFile ( cfg1Stream , encoding ) ;
ComtradeData . CfgInfo cfg2 = readCfgFile ( cfg2Stream , encoding ) ;
if ( cfg1 = = null | | cfg2 = = null ) {
log . error ( " CFG文件读取失败 " ) ;
return false ;
}
// 步骤2: 计算统一的目标采样率( 对应C代码中的out_smp计算)
float unifiedSampleRate = calculateUnifiedSampleRate ( cfg1 , cfg2 ) ;
log . info ( " 双文件采样率统一 - 文件1原始: {}, 文件2原始: {}, 统一目标: {} " ,
calculateOriginalSampleRate ( cfg1 ) , calculateOriginalSampleRate ( cfg2 ) , unifiedSampleRate ) ;
// 步骤3: 读取两个文件, 都使用统一的采样率
boolean success1 = readDatFileFromStream ( dat1Stream , cfg1 , dataBuf1 ,
cfg1 . getDataFileType ( ) ! = null ? cfg1 . getDataFileType ( ) : FILE_TYPE_BINARY , unifiedSampleRate ) ;
boolean success2 = readDatFileFromStream ( dat2Stream , cfg2 , dataBuf2 ,
cfg2 . getDataFileType ( ) ! = null ? cfg2 . getDataFileType ( ) : FILE_TYPE_BINARY , unifiedSampleRate ) ;
if ( ! success1 | | ! success2 ) {
log . error ( " DAT文件读取失败 " ) ;
return false ;
}
// 步骤4: 处理CFG信息
boolean result1 = processCfgAndData ( cfg1 , dataBuf1 , unifiedSampleRate ) ;
boolean result2 = processCfgAndData ( cfg2 , dataBuf2 , unifiedSampleRate ) ;
return result1 & & result2 ;
} catch ( Exception e ) {
log . error ( " 双文件读取过程中发生错误 " , e ) ;
return false ;
}
}
/**
* 计算两个文件的统一采样率( 对应C代码第327-346行的逻辑)
*/
private static float calculateUnifiedSampleRate ( ComtradeData . CfgInfo cfg1 , ComtradeData . CfgInfo cfg2 ) {
float rate1 = calculateOriginalSampleRate ( cfg1 ) ;
float rate2 = calculateOriginalSampleRate ( cfg2 ) ;
// 取较小的采样率( 对应C代码第339-342行)
float minRate = Math . min ( rate1 , rate2 ) ;
// 对应C代码第345-346行: 限制最大采样率为256点/周波
if ( minRate > 256 ) {
log . info ( " 统一采样率{}超过256, 限制为256点/周波 " , minRate ) ;
minRate = 256 ;
}
log . info ( " 双文件采样率统一 - 文件1: {}点/周波, 文件2: {}点/周波, 统一为: {}点/周波 " ,
rate1 , rate2 , minRate ) ;
return minRate ;
}
/**
* 计算原始采样率(每周波采样点数)
*/
private static float calculateOriginalSampleRate ( ComtradeData . CfgInfo cfgInfo ) {
float lineFreq = ( float ) cfgInfo . getLineFreq ( ) ;
if ( lineFreq < = 0 ) {
lineFreq = 50 . 0f ;
}
return ( float ) cfgInfo . getSampleRate ( ) / lineFreq ;
}
}