@@ -2,13 +2,12 @@ package com.njcn.jbsyncdata.service.impl;
import cn.hutool.core.collection.CollectionUtil ;
import cn.hutool.core.date.DatePattern ;
import cn.hutool.core.date.DateTime ;
import cn.hutool.core.date.DateUtil ;
import cn.hutool.core.io.file.FileReader ;
import cn.hutool.core.text.StrPool ;
import cn.hutool.core.util.StrUtil ;
import cn.hutool.json.JSONObject ;
import cn.hutool.json.JSONUtil ;
import com.njcn.influx.utils.InfluxDbUtils ;
import com.njcn.jbsyncdata.component.TokenComponent ;
import com.njcn.jbsyncdata.enums.MeasTypeEnum ;
import com.njcn.jbsyncdata.pojo.ExcelData ;
@@ -16,11 +15,16 @@ import com.njcn.jbsyncdata.pojo.result.*;
import com.njcn.jbsyncdata.service.IBusinessService ;
import com.njcn.jbsyncdata.util.RestTemplateUtil ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.collections4.ListUtils ;
import org.influxdb.InfluxDB ;
import org.influxdb.dto.BatchPoints ;
import org.influxdb.dto.Point ;
import org.springframework.http.ResponseEntity ;
import org.springframework.stereotype.Service ;
import javax.annotation.Resource ;
import java.util.* ;
import java.util.concurrent.TimeUnit ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
@@ -28,43 +32,31 @@ import java.util.stream.Stream;
@Service
public class BusinessServiceImpl implements IBusinessService {
@Resource
private TokenComponent tokenComponent ;
@Overrid e
public void testInterface ( List < ExcelData > list ) {
RestTemplateUtil restTemplateUtil = new RestTemplateUtil ( ) ;
TokenResult tokenWithRestTemplate = tokenComponent . getTokenWithRestTemplate ( ) ;
if ( null = = tokenWithRestTemplate ) {
log . error ( " token信息没有获取到 " ) ;
return ;
}
JSONObject jsonObject ;
JSONObject jsonObjectSub ;
for ( ExcelData excelData : list ) {
jsonObject = JSONUtil . createObj ( ) ;
jsonObjectSub = JSONUtil . createObj ( ) ;
jsonObject . set ( " page " , 1 ) ;
jsonObject . set ( " perPage " , 50 ) ;
List < String > psrIds = Stream . of ( excelData . getStageID ( ) ) . collect ( Collectors . toList ( ) ) ;
jsonObjectSub . set ( " psrIds " , psrIds ) ;
jsonObjectSub . set ( " psrType " , " 0401004 " ) ;
jsonObjectSub . set ( " astIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " astType " , " " ) ;
jsonObjectSub . set ( " termIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " termType " , " " ) ;
jsonObjectSub . set ( " measPointIds " , new ArrayList < > ( ) ) ;
jsonObject . set ( " filter " , jsonObjectSub ) ;
//组装好json开始发送请求
Map < String , String > headers = new HashMap < > ( ) ;
headers . put ( " x-token " , tokenWithRestTemplate . getAccess_token ( ) ) ;
ResponseEntity < String > response = restTemplateUtil . post ( tokenComponent . getUrl ( ) . concat ( " /realMeasCenter/measPoint/commonQuery " ) , headers , jsonObject , String . class ) ;
log . error ( " 请求接口,台区号为:{},结果为:{} " , excelData . getStageID ( ) , response ) ;
}
}
@Resourc e
private InfluxDbUtils influxDbUtils ;
/**
* 此方法通过发电客户编号查询数据,该方法存在以下问题
* 问题一: 一个发电客户编号同指标返回的数据会有多个, 但是目前看到最多2个测量点数据。
* 解决方案:匹配第一条,丢弃后续数据。
* 问题二: 一个客户编号最多2个测量点, 一次查8个指标, 即返回16条数据, 总计大约16万个用户编号, 如何高效查询并同步入库
* 解决方案: 暂且定500个客户编号, 将每页数据量调整为 500 * 20 = 1W的size, 避免存在匹配客户编号数据时, 遥测数据不在当前页。
* 问题三:时间区间如何控制?
* 解决方案: 目前暂定通过定时任务, 如每天凌晨2点查询前天的数据入库。
* 问题四:根据客户编号&指标查询数据会出现几种数据为空的情况
* 现象: 1. 首先GeneralResult的result属性直接为null------------不做处理,直接过
* 2. PageResult的records属性为null---------------------不做处理,直接过
* 3. CommonTelemetry的遥测数据集合telemetryValue为null--不做处理,直接过
* 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0
*
* @param excelDataList 客户编号集合
*/
@Override
public void testInterfaceByUserId ( List < ExcelData > list ) {
public void testInterfaceByUserId ( List < ExcelData > excelDataList , String startTime , String endTime ) {
RestTemplateUtil restTemplateUtil = new RestTemplateUtil ( ) ;
TokenResult tokenWithRestTemplate = tokenComponent . getTokenWithRestTemplate ( ) ;
if ( null = = tokenWithRestTemplate ) {
@@ -73,15 +65,21 @@ public class BusinessServiceImpl implements IBusinessService {
}
JSONObject jsonObject ;
JSONObject jsonObjectSub ;
for ( ExcelData excelData : list ) {
//将发电客户编号按500尺寸分片
List < List < ExcelData > > partitionList = ListUtils . partition ( excelDataList , 500 ) ;
for ( List < ExcelData > excelData : partitionList ) {
Map < /*表名*/ String , List < Map < /*属性名*/ String , /*数值*/ String > > > typeData = new HashMap < > ( ) ;
//按批次处理客户编号数据
jsonObject = JSONUtil . createObj ( ) ;
jsonObjectSub = JSONUtil . createObj ( ) ;
jsonObject . set ( " page " , 1 ) ;
jsonObject . set ( " perPage " , 5 0) ;
jsonObject . set ( " startTime " , " 2023-10-07 00:00:00 " ) ;
jsonObject . set ( " endTime " , " 2023-10-07 23:59:59 " ) ;
List < String > userId = Stream . of ( " 160 " . concat ( excelData . getGenerationUserID ( ) ) ) . collect ( Collectors . toList ( ) ) ;
jsonObjectSub . set ( " consNos " , userId ) ;
jsonObject . set ( " perPage " , 1000 0) ;
jsonObject . set ( " startTime " , startTime ) ;
jsonObject . set ( " endTime " , endTime ) ;
//处理客户编号
List < String > generationUserIDList = excelData . stream ( ) . map ( t - > " 160 " . concat ( t . getGenerationUserID ( ) ) ) . collect ( Collectors . toList ( ) ) ;
jsonObjectSub . set ( " consNos " , generationUserIDList ) ;
//1公专变2低压用户3光伏
jsonObjectSub . set ( " consType " , 3 ) ;
jsonObjectSub . set ( " astIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " astType " , " " ) ;
@@ -97,25 +95,32 @@ public class BusinessServiceImpl implements IBusinessService {
headers . put ( " x-token " , tokenWithRestTemplate . getAccess_token ( ) ) ;
ResponseEntity < GeneralResult > response = restTemplateUtil . post ( tokenComponent . getUrl ( ) . concat ( " /realMeasCenter/telemetry/commonQuery " ) , headers , jsonObject , GeneralResult . class ) ;
if ( response . getStatusCodeValue ( ) = = 200 & & response . getBody ( ) . getStatus ( ) . equalsIgnoreCase ( " 000000 " ) ) {
Page Result r esult = response . getBody ( ) . getResult ( ) ;
General Result generalR esult = response . getBody ( ) ;
PageResult result = generalResult . getResult ( ) ;
List < CommonTelemetry > records = result . getRecords ( ) ;
if ( Objects . isNull ( result ) | | CollectionUtil . isEmpty ( result . getRecords ( ) ) ) {
//日志输出:
log . error ( " 用户编号为 : {},无遥测数据;" , excelData . getGenerationUserID ( ) ) ;
log . error ( " 起始时间:{},截止时间 : {},无遥测数据;" , startTime , endTime ) ;
continue ;
}
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
List < String > userIdConcatMeasType = new ArrayList < > ( ) ;
for ( String measType : typeList ) {
List < String > temp = generationUserIDList . stream ( ) . map ( t - > t . concat ( StrPool . AT ) . concat ( measType ) ) . collect ( Collectors . toList ( ) ) ;
userIdConcatMeasType . addAll ( temp ) ;
}
//处理各个record的数据, 因用户下可能有多个测量点, 按指标循环, 默认采用第一个匹配上的做数据处理
Map < /*表名*/ String , List < Map < /*属性名*/ String , /*数值*/ String > > > typeData = new HashMap < > ( ) ;
for ( String type : typeList ) {
MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum . getMeasTypeEnumByMeasType ( type ) ;
for ( CommonTelemetry commonTelemetry : records ) {
if ( type . equalsIgnoreCase ( commonTelemetry . getMeasTypeCode ( ) ) ) {
String dataIdent ify = commonTelemetry . getConsNo ( ) . concat ( StrPool . AT ) . concat ( commonTelemetry . getMeasTypeCode ( ) ) ;
if ( userIdConcatMeasType . contains ( dataIdentify ) ) {
//首个包含该标识的数据进行处理
MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum . getMeasTypeEnumByMeasType ( commonTelemetry . getMeasTypeCode ( ) ) ;
List < StatisticsData > statisticsDataList = commonTelemetry . getTelemetryValue ( ) ;
List < Map < /*属性名*/ String , /*数值*/ String > > influxData = new ArrayList < > ( ) ;
for ( StatisticsData statisticsData : statisticsDataList ) {
for ( StatisticsData statisticsData : statisticsDataList ) { // 匹配上进入, 循环96次
Map < String , String > tempInfluxData = new HashMap < > ( ) ;
tempInfluxData . put ( " phasic_type " , measTypeEnumByMeasType . getPhaseType ( ) ) ;
tempInfluxData . put ( " line_id " , " 160 " . concat ( excelData . getGenerationUserID ( ) ) ) ;
tempInfluxData . put ( " line_id " , commonTelemetry . getConsNo ( ) ) ;
tempInfluxData . put ( " quality_flag " , " 0 " ) ;
tempInfluxData . put ( " value_type " , " AVG " ) ;
tempInfluxData . put ( " time " , statisticsData . getDataTime ( ) ) ;
@@ -125,51 +130,56 @@ public class BusinessServiceImpl implements IBusinessService {
}
//measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData . put ( measTypeEnumByMeasType . getMeasType ( ) . concat ( StrPool . AT ) . concat ( measTypeEnumByMeasType . getTableName ( ) ) , influxData ) ;
//处理完,删除该条记录,减少集合尺寸,提高效率
userIdConcatMeasType . remove ( dataIdentify ) ;
break ;
}
}
}
//每片数据获取完毕后, 将数据处理入influxdb库
batchInsertData ( typeData ) ;
}
log . error ( " 请求接口,台区号为:{},结果为:{} " , excelData . getStageID ( ) , response ) ;
}
}
public static void main ( String [ ] args ) {
String path = " C: \\ Users \\ 83944 \\ Desktop \\ test \\ test.txt " ;
FileReader fileReader = new FileReader ( path ) ;
String jsonStr = fileReader . readString ( ) ;
GeneralResult result = JSONUtil . toBean ( jsonStr , GeneralResult . class ) ;
Lis t< CommonTelemetry > records = result . getResult ( ) . getRecords ( ) ;
//处理各个record的数据, 因用户下可能有多个测量点, 按指标循环, 默认采用第一个匹配上的做数据处理
Map < /*表名*/ String , List < Map < /*属性名*/ String , /*数值*/ String > > > typeD ata = new HashMap < > ( ) ;
List < String > typeList = Stre am. of ( " PhV_phsA " , " PhV_phsB " , " PhV_phsC " ) . collect ( Collectors . toList ( ) ) ;
for ( String type : typeList ) {
MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum . getMeasTypeEnumByMeasType ( type ) ;
for ( CommonTelemetry commonTelemetry : records ) {
if ( type . equalsIgnoreCase ( commonTelemetry . getMeasTypeCode ( ) ) ) {
List < StatisticsData > statisticsDataList = commonTelemetry . getTelemetryValue ( ) ;
List < Map < /*属性名*/ String , /*数值*/ String > > influxData = new ArrayList < > ( ) ;
for ( S tatisticsData statisticsData : statisticsDataList ) {
Map < String , String > tempInfluxData = new HashMap < > ( ) ;
tempInfluxData . put ( " phasic_type " , measTypeEnumByMeasType . getPhaseType ( ) ) ;
tempInfluxData . put ( " line_id " , " 1602514341899 " ) ;
tempInfluxData . put ( " quality_flag " , " 0 " ) ;
tempInfluxData . put ( " value_type " , " AVG " );
tempInfluxData . put ( " time " , statisticsData . getDataTime ( ) ) ;
//为空则赋值为0, 表中其他均为0
tempInfluxData . put ( measTypeEnumByMeasType . getFieldName ( ) , StrUtil . isBlank ( statisticsData . getMeasValue ( ) ) ? " 0 " : statisticsData . getMeasValue ( ) ) ;
influxData . add ( tempInfluxData ) ;
/**
* 批量入库influxDB
* @param typeData 远程根据用户编号获取的数据 Map</表名/String, List<Map</属性名/String,/数值/String>>> typeData = new HashMap<>();
*/
private void batchInsertData ( Map < String , List < Map < String , String > > > typeData ) {
List < String > sqlList = new ArrayList < > ( ) ;
Se t< String > tableNames = typeData . keySet ( ) ;
for ( String tableName : tableNames ) {
List < Map < String , String > > d ata = typeData . get ( tableName ) ;
tableName = tableN ame . substring ( tableName . indexOf ( StrPool . AT ) + 1 ) ;
//需要转换的实体类class类
for ( Map < String , String > datum : data ) {
//tag数据
Map < String , String > tags = new HashMap < > ( ) ;
tags . put ( " phasic_type " , datum . get ( " phasic_type " ) ) ;
datum . remove ( " phasic_type " ) ;
tags . put ( " line_id " , datum . get ( " line_id " ) ) ;
datum . remove ( " line_id " ) ;
tags . put ( " quality_flag " , datum . get ( " quality_flag " ) ) ;
datum . remove ( " quality_flag " ) ;
tags . put ( " value_type " , datum . get ( " value_type " ) ) ;
datum . remove ( " value_type " ) ;
String time = datum . get ( " time " ) ;
datum . remove ( " time " ) ;
//tag数据删完后, 剩余均是filed数据,因filed属性名不固定, 无法指定获取, 直接循环
Map < String , Object > fields = new HashMap < > ( ) ;
Set < String > fieldNames = datum . keySet ( ) ;
for ( String fieldName : fieldNames ) {
fields . put ( fieldName , Double . parseDouble ( datum . get ( fieldName ) ) ) ;
}
typeData . put ( measTypeEnumByMeasType . getMeasType ( ) . concat ( " @ " ) . concat ( measTypeEnumByMeasType . getTableName ( ) ) , influxData ) ;
break ;
}
}
}
System . out . println ( 11 ) ;
Point point = influxDbUtils . pointBuilder ( tableName , DateUtil . parse ( time , DatePattern . NORM_DATETIME_FORMATTER ) . getTime ( ) + 8 * 3600 * 1000 , TimeUnit . MILLISECONDS , tags , fields ) ;
BatchPoints batchPoints = BatchPoints . database ( influxDbUtils . getDbName ( ) ) . retentionPolicy ( " " ) . consistency ( InfluxDB . ConsistencyLevel . ALL ) . build ( ) ;
batchPoints . point ( point ) ;
sqlList . add ( batchPoints . lineProtocol ( ) ) ;
}
}
influxDbUtils . batchInsert ( influxDbUtils . getDbName ( ) , " " , InfluxDB . ConsistencyLevel . ALL , TimeUnit . MILLISECONDS , sqlList ) ;
}
}