1.台区台账模板导入(唐山/张家口)
2.获取台区遥测数据接口编写
This commit is contained in:
264
src/main/java/com/njcn/jbsyncdata/util/AreaDataProcessing.java
Normal file
264
src/main/java/com/njcn/jbsyncdata/util/AreaDataProcessing.java
Normal file
@@ -0,0 +1,264 @@
|
||||
package com.njcn.jbsyncdata.util;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.io.file.FileReader;
|
||||
import cn.hutool.core.io.file.FileWriter;
|
||||
import cn.hutool.core.text.StrPool;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONArray;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.njcn.influx.utils.InfluxDbUtils;
|
||||
import com.njcn.jbsyncdata.component.TokenComponent;
|
||||
import com.njcn.jbsyncdata.enums.MeasTypeEnum;
|
||||
import com.njcn.jbsyncdata.pojo.InfluxAreaData;
|
||||
import com.njcn.jbsyncdata.pojo.po.PmsPowerDistributionarea;
|
||||
import com.njcn.jbsyncdata.pojo.result.TokenResult;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.dto.BatchPoints;
|
||||
import org.influxdb.dto.Point;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author wr
|
||||
* @description
|
||||
* @date 2023/10/20 14:14
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class AreaDataProcessing {
|
||||
|
||||
private final InfluxDbUtils influxDbUtils;
|
||||
|
||||
@Async("asyncExecutor")
|
||||
public void asyncInfluxDb(
|
||||
TokenComponent tokenComponent,
|
||||
String date,
|
||||
RestTemplateUtil restTemplateUtil,
|
||||
List<String> typeList,
|
||||
JSONObject jsonObject,
|
||||
JSONObject jsonObjectSub,
|
||||
Map<String, String> headers,
|
||||
List<List<PmsPowerDistributionarea>> singleQueryDataUserId, int k
|
||||
) {
|
||||
TokenResult tokenWithRestTemplate;
|
||||
//将发电用户编号按100尺寸分片
|
||||
List<List<PmsPowerDistributionarea>> partitionList = ListUtils.partition(singleQueryDataUserId.get(k), 100);
|
||||
log.error("总计分了{}片", partitionList.size());
|
||||
int count = 0;
|
||||
tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate();
|
||||
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
|
||||
//先获取数据
|
||||
List<ResponseEntity<String>> responseEntities = new ArrayList<>(2000);
|
||||
int kk = k + 1;
|
||||
for (List<PmsPowerDistributionarea> generationAreaIDList : partitionList) {
|
||||
count++;
|
||||
log.error("查询第{}大片,{}小片数据", kk, count);
|
||||
//按批次处理用户编号数据
|
||||
List<String> psrIds = generationAreaIDList.stream().map(PmsPowerDistributionarea::getPmsID).collect(Collectors.toList());
|
||||
jsonObjectSub.set("psrIds", psrIds);
|
||||
JSONArray jsonArray = JSONUtil.createArray();
|
||||
jsonArray.add(jsonObjectSub);
|
||||
jsonObject.set("filters", jsonArray);
|
||||
try {
|
||||
responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class));
|
||||
} catch (Exception exception) {
|
||||
log.error("远程调用接口异常,异常为:" + exception);
|
||||
}
|
||||
}
|
||||
//开始解析数据
|
||||
Set<String> userIdConcatMeasType = new HashSet<>();
|
||||
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
|
||||
for (String measType : typeList) {
|
||||
userIdConcatMeasType.addAll(singleQueryDataUserId.get(k).stream().map(x->x.getPmsID().concat(StrPool.AT).concat(measType)).collect(Collectors.toSet()));
|
||||
}
|
||||
List</*各值以逗号分隔*/InfluxAreaData> influxData;
|
||||
Map</*表名*/String, List</*各值以逗号分隔*/String>> typeData = new HashMap<>();
|
||||
StringBuilder tempInfluxData;
|
||||
ResponseEntity<String> response;
|
||||
JSONArray statisticsDataList;
|
||||
JSONObject result;
|
||||
JSONObject statisticsData;
|
||||
JSONObject body;
|
||||
JSONArray records;
|
||||
String dataIdentify;
|
||||
JSONObject commonTelemetry;
|
||||
MeasTypeEnum measTypeEnumByMeasType;
|
||||
|
||||
//获取资源id和台区的对应关系
|
||||
Map<String, List<String>> areaMap = singleQueryDataUserId.get(k).stream().collect(Collectors.groupingBy(PmsPowerDistributionarea::getPmsID, Collectors.mapping(PmsPowerDistributionarea::getId, Collectors.toList())));
|
||||
|
||||
for (int i = 0; i < partitionList.size(); i++) {
|
||||
log.error("解析第{}片数据", i);
|
||||
response = responseEntities.get(i);
|
||||
body = JSONUtil.parseObj(response.getBody());
|
||||
|
||||
// String path = "C:\\Users\\web2023\\Desktop\\分布式光伏台区API调用结果\\2.txt";
|
||||
// FileReader fileReader = new FileReader(path);
|
||||
// String jsonStr = fileReader.readString();
|
||||
// body = JSONUtil.parseObj(jsonStr);
|
||||
if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) {
|
||||
result = JSONUtil.parseObj(body.get("result", String.class));
|
||||
records = JSONUtil.parseArray(result.get("records", String.class));
|
||||
log.error("查询遥测数据结束,返回数据量:{}", records.size());
|
||||
if (CollectionUtil.isEmpty(records)) {
|
||||
//日志输出:
|
||||
log.error("查询时间:{},无遥测数据;", date);
|
||||
continue;
|
||||
}
|
||||
//处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理
|
||||
for (Object obj : records) { // 最多循环100*16次
|
||||
commonTelemetry = JSONUtil.parseObj(obj);
|
||||
dataIdentify = commonTelemetry.get("psrId", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class));
|
||||
if (userIdConcatMeasType.contains(dataIdentify)) {
|
||||
//首个包含该标识的数据进行处理
|
||||
measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class));
|
||||
//统计数据,经过测试,接口响应json可能不包含该属性
|
||||
statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class);
|
||||
if (CollectionUtil.isEmpty(statisticsDataList)) {
|
||||
//添加进有指标但无遥测数据集合
|
||||
continue;
|
||||
}
|
||||
influxData = new ArrayList<>();
|
||||
InfluxAreaData influxAreaData;
|
||||
for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次
|
||||
statisticsData = JSONUtil.parseObj(subObj);
|
||||
if(areaMap.containsKey(commonTelemetry.get("psrId", String.class))){
|
||||
List<String> list = areaMap.get(commonTelemetry.get("psrId", String.class));
|
||||
for (String s : list) {
|
||||
tempInfluxData = new StringBuilder();
|
||||
tempInfluxData.append(measTypeEnumByMeasType.getPhaseType())
|
||||
.append(StrPool.COMMA)
|
||||
.append(s)
|
||||
.append(StrPool.COMMA)
|
||||
.append(statisticsData.get("dataTime", String.class))
|
||||
.append(StrPool.COMMA)
|
||||
.append(measTypeEnumByMeasType.getFieldName())
|
||||
.append(StrPool.COMMA)
|
||||
.append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class));
|
||||
influxAreaData=new InfluxAreaData();
|
||||
influxAreaData.setId(s);
|
||||
influxAreaData.setInfluxData(tempInfluxData.toString());
|
||||
|
||||
influxData.add(influxAreaData);
|
||||
}
|
||||
}
|
||||
}
|
||||
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
|
||||
Map<String, List<String>> influxLineData = influxData.stream().collect(Collectors.groupingBy(InfluxAreaData::getId,Collectors.mapping(InfluxAreaData::getInfluxData, Collectors.toList())));
|
||||
for (String s : influxLineData.keySet()) {
|
||||
typeData.put(s.concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()),influxLineData.get(s) );
|
||||
}
|
||||
|
||||
//处理完,删除该条记录,减少集合尺寸,提高效率
|
||||
userIdConcatMeasType.remove(dataIdentify);
|
||||
}
|
||||
}
|
||||
//没有匹配上的就是该用户没有数据
|
||||
log.error("剩余有{}条标识", userIdConcatMeasType.size());
|
||||
} else {
|
||||
log.error("查询遥测数据失败!第{}片,结果为:{}", count, response);
|
||||
}
|
||||
}
|
||||
//最后输出没有数据的资源编号
|
||||
/**
|
||||
* 输出到2个文件,lackData.txt、 excalationData.txt
|
||||
* 注:
|
||||
* 1、所有指标均没有有数据的资源编号
|
||||
* 2、部分指标没有数据的资源编号,并表明是哪些指标
|
||||
*/
|
||||
if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) {
|
||||
Map<String, List<String>> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str ->
|
||||
str.substring(0, str.indexOf(StrPool.AT))
|
||||
));
|
||||
//全部缺失数据的用户
|
||||
List<String> lackData = new ArrayList<>();
|
||||
//部分缺失的用户及指标
|
||||
List<String> excalationData = new ArrayList<>();
|
||||
Set<String> keyedSet = finalMap.keySet();
|
||||
for (String key : keyedSet) {
|
||||
List<String> data = finalMap.get(key);
|
||||
if (data.size() == typeList.size()) {
|
||||
lackData.add(key);
|
||||
} else {
|
||||
data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList());
|
||||
key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT));
|
||||
excalationData.add(key);
|
||||
}
|
||||
}
|
||||
FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/syncData/lackData" + date + k + ".txt"));
|
||||
lackDataWriter.writeLines(lackData);
|
||||
FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/syncData/excalationData" + date + k + ".txt"));
|
||||
excalationDataWriter.writeLines(excalationData);
|
||||
}
|
||||
log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size());
|
||||
//最后批量入库
|
||||
batchInsertData(typeData);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 批量入库influxDB
|
||||
*
|
||||
* @param typeData 远程根据用户编号获取的数据 Map</表名/String, List<Map</属性名/String,/数值/String>>> typeData = new HashMap<>();
|
||||
*/
|
||||
private void batchInsertData(Map<String, List<String>> typeData) {
|
||||
log.error("总计有{}条记录入库,以20000作为基数分片插入influxdb", typeData.size());
|
||||
List<String> sqlList = new ArrayList<>();
|
||||
Set<String> tableNames = typeData.keySet();
|
||||
String[] datas;
|
||||
Map<String, String> tags;
|
||||
Map<String, Object> fields;
|
||||
Point point;
|
||||
BatchPoints batchPoints;
|
||||
for (String tableName : tableNames) {
|
||||
List<String> data = typeData.get(tableName);
|
||||
tableName = tableName.substring(tableName.lastIndexOf(StrPool.AT) + 1);
|
||||
for (String datum : data) {
|
||||
datas = datum.split(StrPool.COMMA);
|
||||
//tag数据
|
||||
tags = new HashMap<>();
|
||||
tags.put("phasic_type", datas[0]);
|
||||
tags.put("line_id", datas[1]);
|
||||
tags.put("quality_flag", "0");
|
||||
tags.put("value_type", "AVG");
|
||||
String time = datas[2];
|
||||
//tag数据删完后,剩余均是filed数据,因filed属性名不固定,无法指定获取,直接循环
|
||||
fields = new HashMap<>();
|
||||
fields.put(datas[3], datas[4]);
|
||||
point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime(), TimeUnit.MILLISECONDS, tags, fields);
|
||||
batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
|
||||
batchPoints.point(point);
|
||||
sqlList.add(batchPoints.lineProtocol());
|
||||
}
|
||||
}
|
||||
List<List<String>> subSqlList = ListUtils.partition(sqlList, 20000);
|
||||
int count = 1;
|
||||
for (List<String> sql : subSqlList) {
|
||||
try {
|
||||
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "autogen", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql);
|
||||
} catch (Exception exception) {
|
||||
log.error("数据批量入库异常,异常为:{}",exception.toString());
|
||||
exception.printStackTrace();
|
||||
}
|
||||
log.error("已经入库{}条记录!", count * 20000);
|
||||
count++;
|
||||
}
|
||||
log.error("当前批次所有数据,{}条均已入库!", sqlList.size());
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user