@@ -4,8 +4,10 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern ;
import cn.hutool.core.date.DateUtil ;
import cn.hutool.core.io.file.FileWriter ;
import cn.hutool.core.map.MapUtil ;
import cn.hutool.core.text.StrPool ;
import cn.hutool.core.util.StrUtil ;
import cn.hutool.json.JSONArray ;
import cn.hutool.json.JSONObject ;
import cn.hutool.json.JSONUtil ;
import com.njcn.influx.utils.InfluxDbUtils ;
@@ -18,6 +20,9 @@ import com.njcn.jbsyncdata.service.IPmsPowerGenerationUserService;
import com.njcn.jbsyncdata.util.RestTemplateUtil ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.collections4.ListUtils ;
import org.apache.commons.collections4.MapUtils ;
import org.apache.commons.collections4.SetUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.influxdb.InfluxDB ;
import org.influxdb.dto.BatchPoints ;
import org.influxdb.dto.Point ;
@@ -49,7 +54,7 @@ public class BusinessServiceImpl implements IBusinessService {
* 问题一: 一个发电客户编号同指标返回的数据会有多个, 但是目前看到最多2个测量点数据。
* 解决方案:匹配第一条,丢弃后续数据。
* 问题二: 一个客户编号最多2个测量点, 一次查8个指标, 即返回16条数据, 总计大约16万个用户编号, 如何高效查询并同步入库
* 解决方案:暂且定5 00个客户编号, 将每页数据量调整为 5 00 * 20 = 1W 的size, 避免存在匹配客户编号数据时, 遥测数据不在当前页。
* 解决方案:暂且定1 00个客户编号, 将每页数据量调整为 1 00 * 20 = 2000 的size, 避免存在匹配客户编号数据时, 遥测数据不在当前页。
* 问题三:时间区间如何控制?
* 解决方案: 目前暂定通过定时任务, 如每天凌晨2点查询前天的数据入库。
* 问题四:根据客户编号&指标查询数据会出现几种数据为空的情况
@@ -58,141 +63,333 @@ public class BusinessServiceImpl implements IBusinessService {
* 3. CommonTelemetry的遥测数据集合telemetryValue为null--不做处理,直接过
* 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0
*/
@Override
public void queryTelemetryData ( String startTime , String endTime ) {
List< String> noDataUser = new ArrayList < > ( ) ;
public void queryTelemetryData2 ( String startTime , String endTime ) {
// 连指标都没有返回的用户
// List< String> noDataUser = new ArrayList<>() ;
//有指标返回, 但是指标的遥测整体数据为null
List < String > noDataUserWithMeasType = new ArrayList < > ( ) ;
RestTemplateUtil restTemplateUtil = new RestTemplateUtil ( ) ;
TokenResult tokenWithRestTemplate = tokenComponent . getTokenWithRestTemplate ( ) ;
if ( null = = tokenWithRestTemplate ) {
log . error ( " token信息获取失败 " ) ;
return ;
}
JSONObject jsonObject ;
JSONObject jsonObjectSub ;
List < String > typeList = MeasTypeEnum . getMeasList ( ) ;
JSONObject jsonObject = JSONUtil . createObj ( ) ;
JSONObject jsonObjectSub = JSONUtil . createObj ( ) ;
jsonObject . set ( " page " , 1 ) ;
jsonObject . set ( " perPage " , 10000 ) ;
jsonObject . set ( " startTime " , " 2023-10-07 00:00:00 " ) ;
jsonObject . set ( " endTime " , " 2023-10-07 11:59:59 " ) ;
//1公专变2低压用户3光伏
jsonObjectSub . set ( " consType " , " 3 " ) ;
jsonObjectSub . set ( " astIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " astType " , " " ) ;
jsonObjectSub . set ( " psrIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " psrType " , " " ) ;
jsonObjectSub . set ( " measPointIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " telemetryTypes " , typeList ) ;
//组装好json开始发送请求
Map < String , String > headers = new HashMap < > ( ) ;
headers . put ( " x-token " , tokenWithRestTemplate . getAccess_token ( ) ) ;
//获取所有发电用户的id
List < String > userIds = pmsPowerGenerationUserService . queryAllUserId ( ) ;
//将发电用户编号按5 00尺寸分片
List < List < String > > partitionList = ListUtils . partition ( userIds , 5 00) ;
for ( List < String > userId : partitionList ) {
Map < /*表名*/ String , List < Map < /*属性名*/ String , /*数值*/ String > > > typeData = new HashMap < > ( ) ;
jsonObject = JSONUtil . createObj ( ) ;
jsonObjectSub = JSONUtil . createObj ( ) ;
jsonObject . set ( " page " , 1 ) ;
jsonObject . set ( " perPage " , 10000 ) ;
jsonObject . set ( " startTime " , startTime ) ;
jsonObject . set ( " endTime " , endTime ) ;
//将发电用户编号按1 00尺寸分片
List < List < String > > partitionList = ListUtils . partition ( userIds , 1 00) ;
log . error ( " 总计分了{}片 " , partitionList . size ( ) ) ;
List < String > userIdConcatMeasType ;
int count = 0 ;
//指标类型集合
List < /*各值以逗号分隔*/ String > influxData ;
Map < /*表名*/ String , List < /*各值以逗号分隔*/ String > > typeData = new HashMap < > ( ) ;
StringBuilder tempInfluxData ;
ResponseEntity < GeneralResult > response ;
List < StatisticsData > statisticsDataList ;
PageResult result ;
List < CommonTelemetry > records ;
String dataIdentify ;
MeasTypeEnum measTypeEnumByMeasType ;
for ( List < String > generationUserIDList : partitionList ) {
count + + ;
log . error ( " 查询第{}片数据 " , count ) ;
//按批次处理用户编号数据
List < String > generationUserIDList = userId . stream ( ) . map ( " 160 " : : concat ) . collect ( Collectors . toList ( ) ) ;
jsonObjectSub . set ( " consNos " , generationUserIDList ) ;
//1公专变2低压用户3光伏
jsonObjectSub . set ( " consType " , 3 ) ;
jsonObjectSub . set ( " astIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " astType " , " " ) ;
jsonObjectSub . set ( " psrIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " psrType " , " " ) ;
jsonObjectSub . set ( " measPointIds " , new ArrayList < > ( ) ) ;
//指标类型集合
List < String > typeList = MeasTypeEnum . getMeasList ( ) ;
jsonObjectSub . set ( " telemetryTypes " , typeList ) ;
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
List < String > userIdConcatMeasType = new ArrayList < > ( ) ;
userIdConcatMeasType = new ArrayList < > ( ) ;
for ( String measType : typeList ) {
List < String > temp = generationUserIDList . stream ( ) . map ( t - > t . concat ( StrPool . AT ) . concat ( measType ) ) . collect ( Collectors . toList ( ) ) ;
userIdConcatMeasType . addAll ( temp ) ;
userIdConcatMeasType . addAll ( generationUserIDList . stream ( ) . map ( t - > t . concat ( StrPool . AT ) . concat ( measType ) ) . collect ( Collectors . toList ( ) ) ) ;
}
jsonObject . set ( " filter " , jsonObjectSub ) ;
//组装好json开始发送请求
Map < String , String > headers = new HashMap < > ( ) ;
headers . put ( " x-token " , tokenWithRestTemplate . getAccess_token ( ) ) ;
ResponseEntity < GeneralResult > response = restTemplateUtil . post ( tokenComponent . getUrl ( ) . concat ( " /realMeasCenter/telemetry/commonQuery " ) , headers , jsonObject , GeneralResult . class ) ;
if ( response . getStatusCodeValue ( ) = = 200 & & response . getBody ( ) . getStatus ( ) . equalsIgnoreCase ( " 000000 " ) ) {
GeneralResult generalResult = response . getBody ( ) ;
PageResult result = generalResult . getResult ( ) ;
List < CommonTelemetry > records = result . getR ecords( ) ;
if ( Objects . isNull ( result ) | | CollectionUtil . isEmpty ( result . getRecords ( ) ) ) {
JSONArray jsonArray = JSONUtil . createArray ( ) ;
jsonArray . add ( jsonObjectSub ) ;
jsonObject . set ( " filters " , jsonArray ) ;
response = restTemplateUtil . post ( tokenComponent . getUrl ( ) . concat ( " /realMeasCenter/telemetry/commonQuery " ) , headers , jsonObject , GeneralResult . class ) ;
if ( response . getStatusCodeValue ( ) = = 200 & & Objects . nonNull ( response . getBody ( ) ) & & response . getBody ( ) . getStatus ( ) . equalsIgnoreCase ( " 000000 " ) ) {
result = response . getBody ( ) . getResult ( ) ;
records = result . getRecords ( ) ;
log . error ( " 查询遥测数据结束,返回数据量:{} " , records . size ( ) ) ;
if ( Objects . isNull ( result ) | | CollectionUtil . isEmpty ( r ecords) ) {
//日志输出:
log . error ( " 起始时间:{},截止时间:{},无遥测数据; " , startTime , endTime ) ;
continue ;
}
//处理各个record的数据, 因用户下可能有多个测量点, 按指标循环, 默认采用第一个匹配上的做数据处理
for ( CommonTelemetry commonTelemetry : records ) { // 最多循环5 00*16次
String dataIdentify = commonTelemetry . getConsNo ( ) . concat ( StrPool . AT ) . concat ( commonTelemetry . getMeasTypeCode ( ) ) ;
for ( CommonTelemetry commonTelemetry : records ) { // 最多循环1 00*16次
dataIdentify = commonTelemetry . getConsNo ( ) . concat ( StrPool . AT ) . concat ( commonTelemetry . getMeasTypeCode ( ) ) ;
if ( userIdConcatMeasType . contains ( dataIdentify ) ) {
//首个包含该标识的数据进行处理
MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum . getMeasTypeEnumByMeasType ( commonTelemetry . getMeasTypeCode ( ) ) ;
List < StatisticsData > statisticsDataList = commonTelemetry . getTelemetryValue ( ) ;
List < Map < /*属性名*/ String , /*数值*/ String > > influxData = new ArrayList < > ( ) ;
for ( StatisticsData statisticsData : statisticsDataList ) { // 匹配上进入, 循环96次
Map < String , String > tempInfluxData = new HashMap < > ( ) ;
tempInfluxData . put ( " phasic_type " , measTypeEnumByMeasType . getPhaseType ( ) ) ;
tempInfluxData . put ( " line_id " , commonTelemetry . getConsNo ( ) ) ;
tempInfluxData . put ( " quality_flag " , " 0 " ) ;
tempInfluxData . put ( " value_type " , " AVG " ) ;
tempInfluxData . put ( " time " , statisticsData . getDataTime ( ) ) ;
//为空则赋值为0, 表中其他均为0
tempInfluxData . put ( measTypeEnumByMeasType . getFieldName ( ) , StrUtil . isBlank ( statisticsData . getMeasValue ( ) ) ? " 0 " : statisticsData . getMeasValue ( ) ) ;
influxData . add ( tempInfluxData ) ;
measTypeEnumByMeasType = MeasTypeEnum . getMeasTypeEnumByMeasType ( commonTelemetry . getMeasTypeCode ( ) ) ;
//统计数据, 经过测试, 接口响应json可能不包含该属性
statisticsDataList = commonTelemetry . getTelemetryValue ( ) ;
i f ( CollectionUtil . isEmpty ( statisticsDataList ) ) {
//添加进有指标但无遥测数据集合
noDataUserWithMeasType . add ( dataIdentify ) ;
continue ;
}
//measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData . put ( measTypeEnumByMeasType . getMeasType ( ) . concat ( StrPool . AT ) . concat ( measTypeEnumByMeasType . getTableName ( ) ) , influxData ) ;
influxData = new ArrayList < > ( ) ;
for ( StatisticsData statisticsData : statisticsDataList ) { // 匹配上进入, 循环96次
tempInfluxData = new StringBuilder ( ) ;
tempInfluxData . append ( commonTelemetry . getMeasTypeCode ( ) )
. append ( StrPool . COMMA )
. append ( commonTelemetry . getConsNo ( ) )
. append ( StrPool . COMMA )
. append ( statisticsData . getDataTime ( ) )
. append ( StrPool . COMMA )
. append ( measTypeEnumByMeasType . getFieldName ( ) )
. append ( StrPool . COMMA )
. append ( StrUtil . isBlank ( statisticsData . getMeasValue ( ) ) ? " 0 " : statisticsData . getMeasValue ( ) ) ;
influxData . add ( tempInfluxData . toString ( ) ) ;
}
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData . put ( commonTelemetry . getConsNo ( ) . concat ( StrPool . AT ) . concat ( measTypeEnumByMeasType . getMeasType ( ) ) . concat ( StrPool . AT ) . concat ( measTypeEnumByMeasType . getTableName ( ) ) , influxData ) ;
//处理完,删除该条记录,减少集合尺寸,提高效率
userIdConcatMeasType . remove ( dataIdentify ) ;
break ;
}
}
//没有匹配上的就是该用户没有数据
noDataUser . addAll ( userIdConcatMeasType ) ;
log . error ( " 剩余有{}条标识 " , userIdConcatMeasType . size ( ) );
// noDataUser.addAll(userIdConcatMeasType);
} else {
log . error ( " 查询遥测数据失败!第{}片,结果为:{} " , count , response ) ;
}
// System.gc();
}
//最后批量入库
batchInsertData ( typeData ) ;
//最后输出没有数据的用户编号
// if (CollectionUtil.isNotEmpty(noDataUser)) {
// noDataUser = noDataUser.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1))
// .distinct()
// .collect(Collectors.toList());
// FileWriter writer = FileWriter.create(new File("/usr/local/demo.txt"));
// writer.writeLines(noDataUser);
// }
// log.error("用户没有数据的长度为:{}", noDataUser.size());
log . error ( " 用户有指标没有数据的长度为:{} " , noDataUserWithMeasType . size ( ) ) ;
}
@Override
public void queryTelemetryData ( String startTime , String endTime ) {
//有指标返回, 但是指标的遥测整体数据为null
RestTemplateUtil restTemplateUtil = new RestTemplateUtil ( ) ;
TokenResult tokenWithRestTemplate = tokenComponent . getTokenWithRestTemplate ( ) ;
if ( null = = tokenWithRestTemplate ) {
log . error ( " token信息获取失败 " ) ;
return ;
}
List < String > typeList = MeasTypeEnum . getMeasList ( ) ;
JSONObject jsonObject = JSONUtil . createObj ( ) ;
JSONObject jsonObjectSub = JSONUtil . createObj ( ) ;
jsonObject . set ( " page " , 1 ) ;
jsonObject . set ( " perPage " , 10000 ) ;
jsonObject . set ( " startTime " , " 2023-10-07 00:00:00 " ) ;
jsonObject . set ( " endTime " , " 2023-10-07 11:59:59 " ) ;
//1公专变2低压用户3光伏
jsonObjectSub . set ( " consType " , " 3 " ) ;
jsonObjectSub . set ( " astIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " astType " , " " ) ;
jsonObjectSub . set ( " psrIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " psrType " , " " ) ;
jsonObjectSub . set ( " measPointIds " , new ArrayList < > ( ) ) ;
jsonObjectSub . set ( " telemetryTypes " , typeList ) ;
//组装好json开始发送请求
Map < String , String > headers = new HashMap < > ( ) ;
headers . put ( " x-token " , tokenWithRestTemplate . getAccess_token ( ) ) ;
//获取所有发电用户的id
List < String > userIds = pmsPowerGenerationUserService . queryAllUserId ( ) ;
//将发电用户编号按100尺寸分片
List < List < String > > partitionList = ListUtils . partition ( userIds , 100 ) ;
log . error ( " 总计分了{}片 " , partitionList . size ( ) ) ;
int count = 0 ;
//先获取数据
List < ResponseEntity < String > > responseEntities = new ArrayList < > ( 2000 ) ;
for ( List < String > generationUserIDList : partitionList ) {
count + + ;
log . error ( " 查询第{}片数据 " , count ) ;
//按批次处理用户编号数据
jsonObjectSub . set ( " consNos " , generationUserIDList ) ;
JSONArray jsonArray = JSONUtil . createArray ( ) ;
jsonArray . add ( jsonObjectSub ) ;
jsonObject . set ( " filters " , jsonArray ) ;
//避免中途token失效
if ( count % 800 = = 0 ) {
tokenWithRestTemplate = tokenComponent . getTokenWithRestTemplate ( ) ;
headers . put ( " x-token " , tokenWithRestTemplate . getAccess_token ( ) ) ;
}
responseEntities . add ( restTemplateUtil . post ( tokenComponent . getUrl ( ) . concat ( " /realMeasCenter/telemetry/commonQuery " ) , headers , jsonObject , String . class ) ) ;
}
//开始解析数据
Set < String > userIdConcatMeasType = new HashSet < > ( ) ;
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
for ( String measType : typeList ) {
userIdConcatMeasType . addAll ( userIds . stream ( ) . map ( t - > t . concat ( StrPool . AT ) . concat ( measType ) ) . collect ( Collectors . toSet ( ) ) ) ;
}
List < /*各值以逗号分隔*/ String > influxData ;
Map < /*表名*/ String , List < /*各值以逗号分隔*/ String > > typeData = new HashMap < > ( ) ;
StringBuilder tempInfluxData ;
ResponseEntity < String > response ;
JSONArray statisticsDataList ;
JSONObject result ;
JSONObject statisticsData ;
JSONObject body ;
JSONArray records ;
String dataIdentify ;
JSONObject commonTelemetry ;
MeasTypeEnum measTypeEnumByMeasType ;
for ( int i = 0 ; i < partitionList . size ( ) ; i + + ) {
log . error ( " 解析第{}片数据 " , i ) ;
response = responseEntities . get ( i ) ;
body = JSONUtil . parseObj ( response . getBody ( ) ) ;
if ( response . getStatusCodeValue ( ) = = 200 & & body . get ( " status " , String . class ) . equalsIgnoreCase ( " 000000 " ) ) {
result = JSONUtil . parseObj ( body . get ( " result " , String . class ) ) ;
records = JSONUtil . parseArray ( result . get ( " records " , String . class ) ) ;
log . error ( " 查询遥测数据结束,返回数据量:{} " , records . size ( ) ) ;
if ( CollectionUtil . isEmpty ( records ) ) {
//日志输出:
log . error ( " 起始时间:{},截止时间:{},无遥测数据; " , startTime , endTime ) ;
continue ;
}
//处理各个record的数据, 因用户下可能有多个测量点, 按指标循环, 默认采用第一个匹配上的做数据处理
for ( Object obj : records ) { // 最多循环100*16次
commonTelemetry = JSONUtil . parseObj ( obj ) ;
dataIdentify = commonTelemetry . get ( " consNo " , String . class ) . concat ( StrPool . AT ) . concat ( commonTelemetry . get ( " measTypeCode " , String . class ) ) ;
if ( userIdConcatMeasType . contains ( dataIdentify ) ) {
//首个包含该标识的数据进行处理
measTypeEnumByMeasType = MeasTypeEnum . getMeasTypeEnumByMeasType ( commonTelemetry . get ( " measTypeCode " , String . class ) ) ;
//统计数据, 经过测试, 接口响应json可能不包含该属性
statisticsDataList = commonTelemetry . get ( " telemetryValue " , JSONArray . class ) ;
if ( CollectionUtil . isEmpty ( statisticsDataList ) ) {
//添加进有指标但无遥测数据集合
continue ;
}
influxData = new ArrayList < > ( ) ;
for ( Object subObj : statisticsDataList ) { // 匹配上进入, 循环96次
statisticsData = JSONUtil . parseObj ( subObj ) ;
tempInfluxData = new StringBuilder ( ) ;
tempInfluxData . append ( measTypeEnumByMeasType . getPhaseType ( ) )
. append ( StrPool . COMMA )
. append ( commonTelemetry . get ( " consNo " , String . class ) )
. append ( StrPool . COMMA )
. append ( statisticsData . get ( " dataTime " , String . class ) )
. append ( StrPool . COMMA )
. append ( measTypeEnumByMeasType . getFieldName ( ) )
. append ( StrPool . COMMA )
. append ( StrUtil . isBlank ( statisticsData . get ( " measValue " , String . class ) ) ? " 0 " : statisticsData . get ( " measValue " , String . class ) ) ;
influxData . add ( tempInfluxData . toString ( ) ) ;
}
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData . put ( commonTelemetry . get ( " consNo " , String . class ) . concat ( StrPool . AT ) . concat ( measTypeEnumByMeasType . getMeasType ( ) ) . concat ( StrPool . AT ) . concat ( measTypeEnumByMeasType . getTableName ( ) ) , influxData ) ;
//处理完,删除该条记录,减少集合尺寸,提高效率
userIdConcatMeasType . remove ( dataIdentify ) ;
}
}
//没有匹配上的就是该用户没有数据
log . error ( " 剩余有{}条标识 " , userIdConcatMeasType . size ( ) ) ;
} else {
log . error ( " 查询遥测数据失败!第{}片,结果为:{} " , count , response ) ;
}
//每片数据获取完毕后, 将数据处理入influxdb库
batchInsertData ( typeData ) ;
}
//最后输出没有数据的用户编号
noDataUser = noDataUser . stream ( ) . map ( t - > t . substring ( t . indexOf ( StrPool . AT + 1 ) ) )
. distinct ( )
. collect ( Collectors . toList ( ) ) ;
FileWriter writer = FileWriter . create ( new File ( " /usr/local/ " + startTime + " - " + endTime + " .txt " ) ) ;
File file = writer . writeLines ( noDataUser ) ;
/**
* 输出到2个文件, lackData.txt、 excalationData.txt
* 注: 用户号去除160前缀
* 1、所有指标均没有有数据的用户编号
* 2、部分指标没有数据的用户编号, 并表明是哪些指标
*/
if ( CollectionUtil . isNotEmpty ( userIdConcatMeasType ) ) {
Map < String , List < String > > finalMap = userIdConcatMeasType . stream ( ) . collect ( Collectors . groupingBy ( str - > {
String key = str . substring ( 3 ) ;
key = key . substring ( 0 , key . indexOf ( StrPool . AT ) ) ;
return key ;
} ) ) ;
//全部缺失数据的用户
List < String > lackData = new ArrayList < > ( ) ;
//部分缺失的用户及指标
List < String > excalationData = new ArrayList < > ( ) ;
Set < String > keyedSet = finalMap . keySet ( ) ;
for ( String key : keyedSet ) {
List < String > data = finalMap . get ( key ) ;
if ( data . size ( ) = = typeList . size ( ) ) {
lackData . add ( key ) ;
} else {
data = data . stream ( ) . map ( t - > t . substring ( t . indexOf ( StrPool . AT ) + 1 ) ) . collect ( Collectors . toList ( ) ) ;
key = key . concat ( StrPool . COMMA ) . concat ( StringUtils . join ( data , StrPool . AT ) ) ;
excalationData . add ( key ) ;
}
}
FileWriter lackDataWriter = FileWriter . create ( new File ( " /usr/local/lackData.txt " ) ) ;
lackDataWriter . writeLines ( lackData ) ;
FileWriter excalationDataWriter = FileWriter . create ( new File ( " /usr/local/excalationData.txt " ) ) ;
excalationDataWriter . writeLines ( excalationData ) ;
}
log . error ( " 用户有指标没有数据的长度为:{} " , userIdConcatMeasType . size ( ) ) ;
//最后批量入库
batchInsertData ( typeData ) ;
}
/**
* 批量入库influxDB
*
* @param typeData 远程根据用户编号获取的数据 Map</表名/String, List<Map</属性名/String,/数值/String>>> typeData = new HashMap<>();
*/
private void batchInsertData ( Map < String , List < Map < String , String > > > typeData ) {
private void batchInsertData ( Map < String , List < String > > typeData ) {
log . error ( " 总计有{}条记录入库, 以5000作为基数分片插入influxdb " , typeData . size ( ) ) ;
List < String > sqlList = new ArrayList < > ( ) ;
Set < String > tableNames = typeData . keySet ( ) ;
String [ ] datas ;
Map < String , String > tags ;
Map < String , Object > fields ;
Point point ;
BatchPoints batchPoints ;
for ( String tableName : tableNames ) {
List < Map < String , String > > data = typeData . get ( tableName ) ;
tableName = tableName . substring ( tableName . i ndexOf( StrPool . AT ) + 1 ) ;
for ( Map < String , String > datum : data ) {
List < String > data = typeData . get ( tableName ) ;
tableName = tableName . substring ( tableName . lastI ndexOf( StrPool . AT ) + 1 ) ;
for ( String datum : data ) {
datas = datum . split ( StrPool . COMMA ) ;
//tag数据
Map < String , String > tags = new HashMap < > ( ) ;
tags . put ( " phasic_type " , datum . get ( " phasic_type " ) ) ;
datum . remove ( " phasic_type " ) ;
tags . put ( " line_id " , datum . get ( " line_id " ) ) ;
datum . remove ( " line_id " ) ;
tags . put ( " quality_flag " , datum . get ( " quality_flag " ) ) ;
datum . remove ( " quality_flag " ) ;
tags . put ( " value_type " , datum . get ( " value_type " ) ) ;
datum . remove ( " value_type " ) ;
String time = datum . get ( " time " ) ;
datum . remove ( " time " ) ;
tags = new HashMap < > ( ) ;
tags . put ( " phasic_type " , datas [ 0 ] ) ;
tags . put ( " line_id " , datas [ 1 ] ) ;
tags . put ( " quality_flag " , " 0 " ) ;
tags . put ( " value_type " , " AVG " ) ;
String time = datas [ 2 ] ;
//tag数据删完后, 剩余均是filed数据,因filed属性名不固定, 无法指定获取, 直接循环
Map < String , Object > fields = new HashMap < > ( ) ;
Set < String > fieldNames = datum . keySet ( ) ;
for ( String fieldName : fieldName s ) {
fields . put ( fieldName , Double . parseDouble ( datum . get ( fieldName ) ) ) ;
}
Point point = influxDbUtils . pointBuilder ( tableName , DateUtil . parse ( time , DatePattern . NORM_DATETIME_FORMATTER ) . getTime ( ) + 8 * 3600 * 1000 , TimeUnit . MILLISECONDS , tags , fields ) ;
BatchPoints batchPoints = BatchPoints . database ( influxDbUtils . getDbName ( ) ) . retentionPolicy ( " " ) . consistency ( InfluxDB . ConsistencyLevel . ALL ) . build ( ) ;
fields = new HashMap < > ( ) ;
fields . put ( datas [ 3 ] , datas [ 4 ] ) ;
point = influxDbUtils . pointBuilder ( tableName , DateUtil . parse ( time , DatePattern . NORM_DATETIME_FORMATTER ) . getTime ( ) , TimeUnit . MILLISECONDS , tags , fields ) ;
batchPoints = BatchPoints . database ( influxDbUtils . getDbName ( ) ) . retentionPolicy ( " " ) . consistency ( InfluxDB . ConsistencyLevel . ALL ) . build ( ) ;
batchPoints . point ( point ) ;
sqlList . add ( batchPoints . lineProtocol ( ) ) ;
}
}
influxDbUtils . batchInsert ( influxDbUtils . getDbName ( ) , " " , InfluxDB . ConsistencyLevel . ALL , TimeUnit . MILLISECONDS , sqlList ) ;
List < List < String > > subSqlList = ListUtils . partition ( sqlList , 50000 ) ;
int count = 1 ;
for ( List < String > sql : subSqlList ) {
influxDbUtils . batchInsert ( influxDbUtils . getDbName ( ) , " " , InfluxDB . ConsistencyLevel . ALL , TimeUnit . MILLISECONDS , sql ) ;
log . error ( " 已经入库{}条记录! " , count * 50000 ) ;
count + + ;
}
}
}