@@ -5,11 +5,11 @@ import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import com.njcn.common.config.GeneralInfo ;
import com.njcn.common.pojo.dto.wave.WaveDataDTO ;
import com.njcn.common.pojo.enums.common.ServerEnum ;
import com.njcn.common.pojo.exception.BusinessException ;
import com.njcn.common.utils.PubUtils ;
import com.njcn.common.utils.wave.AnalyWave ;
import com.njcn.oss.constant.GeneralConstant ;
import com.njcn.oss.constant.OssPath ;
import com.njcn.device.pq.api.GeneralDeviceInfoClient ;
import com.njcn.device.pq.api.LineFeignClient ;
import com.njcn.device.pq.pojo.dto.GeneralDeviceDTO ;
@@ -17,6 +17,7 @@ import com.njcn.device.pq.pojo.vo.AreaLineInfoVO;
import com.njcn.device.pq.pojo.vo.LineDetailDataVO ;
import com.njcn.event.enums.EventResponseEnum ;
import com.njcn.event.mapper.majornetwork.TransientMapper ;
import com.njcn.event.pojo.dto.wave.WaveDataDTO ;
import com.njcn.event.pojo.param.TransientParam ;
import com.njcn.event.pojo.param.WaveFileParam ;
import com.njcn.event.pojo.po.EventDetail ;
@@ -24,6 +25,8 @@ import com.njcn.event.pojo.po.EventDetailNew;
import com.njcn.event.pojo.vo.TransientVO ;
import com.njcn.event.service.majornetwork.EventDetailService ;
import com.njcn.event.service.majornetwork.TransientService ;
import com.njcn.event.utils.WaveUtil ;
import com.njcn.huawei.obs.util.OBSUtil ;
import com.njcn.influxdb.mapper.InfluxDBResultMapperCn ;
import com.njcn.influxdb.param.InfluxDBPublicParam ;
import com.njcn.influxdb.utils.InfluxDBCommUtils ;
@@ -45,7 +48,6 @@ import java.time.LocalDateTime;
import java.time.ZoneId ;
import java.time.format.DateTimeFormatter ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.List ;
import java.util.Objects ;
import java.util.stream.Collectors ;
@@ -75,6 +77,11 @@ public class TransientServiceImpl implements TransientService {
private final InfluxDbUtils influxDbUtils ;
private final OBSUtil obsUtil ;
private final WaveUtil waveUtil ;
@Override
public Page < TransientVO > getTransientData ( TransientParam transientParam ) {
Page < TransientVO > page = new Page < > ( ) ;
@@ -86,7 +93,7 @@ public class TransientServiceImpl implements TransientService {
List < GeneralDeviceDTO > deviceList = generalDeviceInfoClient . getPracticalRunDeviceInfo ( transientParam ) . getData ( ) ;
if ( ! CollectionUtils . isEmpty ( deviceList ) ) {
//获取按终端分类的监测点索引集合
List < String > LineIndexes = deviceList . stream ( ) . flatMap ( list - > list . getLineIndexes ( ) . stream ( ) ) . collect ( Collectors . toList ( ) ) ;
List < String > LineIndexes = deviceList . stream ( ) . flatMap ( list - > list . getLineIndexes ( ) . stream ( ) ) . collect ( Collectors . toList ( ) ) ;
if ( ! CollectionUtils . isEmpty ( LineIndexes ) ) {
//influxDB查询待分页数据总量
@@ -96,7 +103,7 @@ public class TransientServiceImpl implements TransientService {
int pages = (int) Math.ceil(data.size() * 1.0 / transientParam.getPageSize());
page.setPages(pages);*/
//influxDB分页查询
List < EventDetail > eventDetailData = eventDetailService . getEventDetailLimit ( LineIndexes , transientParam . getSearchBeginTime ( ) , transientParam . getSearchEndTime ( ) , transientParam . getPageSize ( ) , transientParam . getPageNum ( ) , transientParam . getWaveType ( ) ) ;
List < EventDetail > eventDetailData = eventDetailService . getEventDetailLimit ( LineIndexes , transientParam . getSearchBeginTime ( ) , transientParam . getSearchEndTime ( ) , transientParam . getPageSize ( ) , transientParam . getPageNum ( ) , transientParam . getWaveType ( ) ) ;
if ( ! CollectionUtils . isEmpty ( eventDetailData ) ) {
List < String > lineIds = eventDetailData . stream ( ) . map ( EventDetail : : getLineId ) . collect ( Collectors . toList ( ) ) ;
@@ -168,32 +175,45 @@ public class TransientServiceImpl implements TransientService {
@Override
public WaveDataDTO getTransientAnalyseWave ( String timeId , String lineId ) {
WaveDataDTO waveDataDTO ;
//原始数据
WaveDataDTO originalData ;
//根据监测点id获取监测点详情
LineDetailDataVO lineDetailData = lineFeignClient . getLineDetailData ( lineId ) . getData ( ) ;
EventDetail eventDetailByTime = eventDetailService . getEventDetailByTime ( lineId , timeId ) ;
String ip = lineDetailData . getIp ( ) ;
String waveName = eventDetailByTime . getWaveName ( ) ;
// TransientVO transientVO = transientMapper.getTransientDataById(lineId) ;
// String name = transientVO.getName();
// String substation = transientVO.getSubstation() ;
// Double eventValue = new BigDecimal(eventDetailByTime.getEventValue()).setScale(3, BigDecimal.ROUND_HALF_UP).doubleValue() ;
// Double v = new BigDecimal(eventValue*100).setScale(0,BigDecimal.ROUND_HALF_UP).doubleValue() ;
// Double persistTime = eventD eta ilByTime.getPersistTime()/1000 ;
// waveDataVO.setName("变电站名称: "+ substation +" 监测点名称: "+ name +" 发生时刻: "+ timeId +" 暂降幅值: "+ v +"% 持续时间: "+ persistTime +"s") ;
// waveDataVO.setTargetName("相电压有效值");
AnalyWave analyWave = new AnalyWave ( ) ;
// 从本地读取该事件的波形
WaveDataDTO comtrade = analyWave . getComtrade ( generalInfo . getBusinessWavePath ( ) + File . separator + ip + File . separator + waveName + " .CFG " , 1 ) ;
if ( Objects . isNull ( comtrade . getComtradeCfgDTO ( ) ) ) {
throw new BusinessException ( EventResponseEnum . ANALYSEWAVE_NOT_FOUND ) ;
String cfgPath , datPath ;
if ( generalInfo . getBusinessFileStorage ( ) = = GeneralConstant . LOCAL_DISK ) {
cfgPath = generalInfo . getBusinessWavePath ( ) + File . separator + ip + File . separator + waveName + GeneralConstant . CFG ;
datPath = generalInfo . getBusinessWavePath ( ) + File . separator + ip + File . separator + waveName + GeneralConstant . DAT ;
InputStream cfgStream = waveUtil . getFileInputStreamByFilePath ( cfgPath ) ;
InputStream datStream = waveUtil . g etF ileInputStreamByFilePath ( datPath ) ;
if ( Objects . isNull ( cfgStream ) | | Objects . isNull ( datStream ) ) {
throw new BusinessException ( EventResponseEnum . ANALYSEWAVE_NOT_FOUND ) ;
}
originalData = waveUtil . getComtrade ( cfgStream , datStream , 1 ) ;
// } else if (generalInfo.getBusinessFileStorage() == GeneralConstant.HUAWEI_OBS) {
} else {
cfgPath = OssPath . WAVE_DIR + ip + StrUtil . SLASH + waveName + GeneralConstant . CFG ;
datPath = OssPath . WAVE_DIR + ip + StrUtil . SLASH + waveName + GeneralConstant . DAT ;
try (
InputStream cfgStream = obsUtil . fileDownload ( cfgPath ) ;
InputStream datStream = obsUtil . fileDownload ( datPath )
) {
if ( Objects . isNull ( cfgStream ) | | Objects . isNull ( datStream ) ) {
throw new BusinessException ( EventResponseEnum . ANALYSEWAVE_NOT_FOUND ) ;
}
originalData = waveUtil . getComtrade ( cfgStream , datStream , 1 ) ;
} catch ( IOException e ) {
throw new BusinessException ( EventResponseEnum . WAVE_DATA_INVALID ) ;
}
}
waveDataDTO = analyWave . getValidData ( comtrade ) ;
waveDataDTO = waveUtil . getValidData ( originalData ) ;
waveDataDTO . setPtType ( PubUtils . ptTypeName ( lineDetailData . getPtType ( ) ) ) ;
double pt1 = Double . parseDouble ( lineDetailData . getPt ( ) . split ( " / " ) [ 0 ] ) ;
double pt2 = Double . parseDouble ( lineDetailData . getPt ( ) . split ( " / " ) [ 1 ] ) ;
double ct1 = Double . parseDouble ( lineDetailData . getCt ( ) . split ( " / " ) [ 0 ] ) ;
double ct2 = Double . parseDouble ( lineDetailData . getCt ( ) . split ( " / " ) [ 1 ] ) ;
double pt1 = Double . parseDouble ( lineDetailData . getPt ( ) . split ( StrUtil . SLASH ) [ 0 ] ) ;
double pt2 = Double . parseDouble ( lineDetailData . getPt ( ) . split ( StrUtil . SLASH ) [ 1 ] ) ;
double ct1 = Double . parseDouble ( lineDetailData . getCt ( ) . split ( StrUtil . SLASH ) [ 0 ] ) ;
double ct2 = Double . parseDouble ( lineDetailData . getCt ( ) . split ( StrUtil . SLASH ) [ 1 ] ) ;
waveDataDTO . setPt ( pt1 / pt2 ) ;
waveDataDTO . setCt ( ct1 / ct2 ) ;
return waveDataDTO ;
@@ -236,7 +256,7 @@ public class TransientServiceImpl implements TransientService {
}
@Override
public Page < EventDetailNew > getTransientValue ( TransientParam transientParam ) {
public Page < EventDetailNew > getTransientValue ( TransientParam transientParam ) {
Page < EventDetailNew > page = new Page < > ( ) ;
page . setSize ( transientParam . getPageSize ( ) ) ;
page . setCurrent ( transientParam . getPageNum ( ) ) ;
@@ -250,7 +270,7 @@ public class TransientServiceImpl implements TransientService {
if ( ! CollectionUtils . isEmpty ( lineList ) ) {
StringBuilder stringBuilder = InfluxDBCommUtils . assToInfluxParam ( lineList ) ;
//influxDB查询待分页数据总量
Long total = getTransientDetail ( stringBuilder , transientParam ) ;
Long total = getTransientDetail ( stringBuilder , transientParam ) ;
page . setTotal ( total ) ;
//分页总页数
int pages = ( int ) Math . ceil ( transientParam . getPageNum ( ) * 1 . 0 / transientParam . getPageSize ( ) ) ;
@@ -268,8 +288,8 @@ public class TransientServiceImpl implements TransientService {
for ( EventDetailNew eventDetail : eventDetailData ) {
for ( AreaLineInfoVO areaLineInfoVO : r ) {
if ( eventDetail . getLineId ( ) . equals ( areaLineInfoVO . getLineId ( ) ) ) {
for ( AreaLineInfoVO areaLineInfoVO : r ) {
if ( eventDetail . getLineId ( ) . equals ( areaLineInfoVO . getLineId ( ) ) ) {
eventDetail . setLineId ( areaLineInfoVO . getLineId ( ) ) ;
eventDetail . setLineName ( areaLineInfoVO . getLineName ( ) ) ;
eventDetail . setGdName ( areaLineInfoVO . getGdName ( ) ) ;
@@ -310,22 +330,22 @@ public class TransientServiceImpl implements TransientService {
/**
* 查询数据库
*/
private Long getTransientDetail ( StringBuilder stringBuilder , TransientParam transientParam ) {
private Long getTransientDetail ( StringBuilder stringBuilder , TransientParam transientParam ) {
Long total = 0L ;
//组装sql语句
stringBuilder . append ( " and time >= ' " ) . append ( DateUtil . beginOfDay ( DateUtil . parse ( transientParam . getSearchBeginTime ( ) ) ) ) . append ( " ' and " ) . append ( " time <= ' " ) . append ( DateUtil . endOfDay ( DateUtil . parse ( transientParam . getSearchEndTime ( ) ) ) ) . append ( " ' " ) . append ( InfluxDBPublicParam . TIME_ZONE ) ;
//sql语句
String sql = " SELECT count(wave_type) FROM pqs_eventdetail WHERE " + stringBuilder ;
System . out . println ( " sql------------->>> " + sql ) ;
System . out . println ( " sql------------->>> " + sql ) ;
//结果集
QueryResult result = influxDbUtils . query ( sql ) ;
//结果集映射到对象中
List < QueryResult . Series > series = result . getResults ( ) . get ( 0 ) . getSeries ( ) ;
if ( CollUtil . isNotEmpty ( series ) ) {
Double tem = ( Double ) series . get ( 0 ) . getValues ( ) . get ( 0 ) . get ( 1 ) ;
if ( CollUtil . isNotEmpty ( series ) ) {
Double tem = ( Double ) series . get ( 0 ) . getValues ( ) . get ( 0 ) . get ( 1 ) ;
total = tem . longValue ( ) ;
}
return total ;
return total ;
}
/**
@@ -334,31 +354,31 @@ public class TransientServiceImpl implements TransientService {
private List < EventDetailNew > getTransientDetailLimit ( List < String > lineIndexes , TransientParam transientParam ) {
//查询数据是否为空, 不为空拼接sql语句
StringBuilder querySql = new StringBuilder ( ) ;
if ( Objects . nonNull ( transientParam . getEventValueMin ( ) ) ) {
if ( Objects . nonNull ( transientParam . getEventValueMin ( ) ) ) {
querySql . append ( " and event_value >= " ) . append ( transientParam . getEventValueMin ( ) ) ;
}
if ( Objects . nonNull ( transientParam . getEventValueMax ( ) ) ) {
if ( Objects . nonNull ( transientParam . getEventValueMax ( ) ) ) {
querySql . append ( " and event_value <= " ) . append ( transientParam . getEventValueMax ( ) ) ;
}
if ( Objects . nonNull ( transientParam . getPersistMin ( ) ) ) {
if ( Objects . nonNull ( transientParam . getPersistMin ( ) ) ) {
querySql . append ( " and persist_time >= " ) . append ( transientParam . getPersistMin ( ) ) ;
}
if ( Objects . nonNull ( transientParam . getEventValueMax ( ) ) ) {
if ( Objects . nonNull ( transientParam . getEventValueMax ( ) ) ) {
querySql . append ( " and persist_time <= " ) . append ( transientParam . getPersistMax ( ) ) ;
}
if ( Objects . nonNull ( transientParam . getSeverityMin ( ) ) ) {
if ( Objects . nonNull ( transientParam . getSeverityMin ( ) ) ) {
querySql . append ( " and severity >= " ) . append ( transientParam . getSeverityMin ( ) ) ;
}
if ( Objects . nonNull ( transientParam . getSeverityMax ( ) ) ) {
if ( Objects . nonNull ( transientParam . getSeverityMax ( ) ) ) {
querySql . append ( " and severity <= " ) . append ( transientParam . getSeverityMax ( ) ) ;
}
if ( Objects . nonNull ( transientParam . getFileFlag ( ) ) ) {
if ( Objects . nonNull ( transientParam . getFileFlag ( ) ) ) {
querySql . append ( " and file_flag = " ) . append ( transientParam . getFileFlag ( ) ) ;
}
if ( CollUtil . isNotEmpty ( transientParam . getWaveType ( ) ) ) {
if ( CollUtil . isNotEmpty ( transientParam . getWaveType ( ) ) ) {
querySql . append ( " and ( " ) ;
for ( int i = 0 ; i < transientParam . getWaveType ( ) . size ( ) ; i + + ) {
if ( transientParam . getWaveType ( ) . size ( ) - i ! = 1 ) {
@@ -368,7 +388,7 @@ public class TransientServiceImpl implements TransientService {
}
}
}
if ( CollUtil . isNotEmpty ( transientParam . getEventReason ( ) ) ) {
if ( CollUtil . isNotEmpty ( transientParam . getEventReason ( ) ) ) {
querySql . append ( " and ( " ) ;
for ( int i = 0 ; i < transientParam . getEventReason ( ) . size ( ) ; i + + ) {
if ( transientParam . getWaveType ( ) . size ( ) - i ! = 1 ) {
@@ -378,7 +398,7 @@ public class TransientServiceImpl implements TransientService {
}
}
}
if ( CollUtil . isNotEmpty ( transientParam . getEventType ( ) ) ) {
if ( CollUtil . isNotEmpty ( transientParam . getEventType ( ) ) ) {
querySql . append ( " and ( " ) ;
for ( int i = 0 ; i < transientParam . getEventType ( ) . size ( ) ; i + + ) {
if ( transientParam . getEventType ( ) . size ( ) - i ! = 1 ) {
@@ -402,8 +422,8 @@ public class TransientServiceImpl implements TransientService {
stringBuilder . append ( " line_id =' " ) . append ( lineIndexes . get ( i ) ) . append ( " ') " ) ;
}
}
int i = ( transientParam . getPageNum ( ) - 1 ) * transientParam . getPageSize ( ) ;
stringBuilder . append ( " LIMIT " ) . append ( transientParam . getPageSize ( ) ) . append ( " OFFSET " ) . append ( i ) . append ( " tz('Asia/Shanghai') " ) ;
int i = ( transientParam . getPageNum ( ) - 1 ) * transientParam . getPageSize ( ) ;
stringBuilder . append ( " LIMIT " ) . append ( transientParam . getPageSize ( ) ) . append ( " OFFSET " ) . append ( i ) . append ( " tz('Asia/Shanghai') " ) ;
//sql语句
String sql = " SELECT * FROM pqs_eventdetail WHERE " + stringBuilder ;