测试解析数据入influx库

This commit is contained in:
2023-10-10 17:42:16 +08:00
parent ead9340ac3
commit fc4080bf41
15 changed files with 1166 additions and 25 deletions

View File

@@ -1,10 +1,18 @@
package com.njcn.jbsyncdata.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.njcn.jbsyncdata.component.TokenComponent;
import com.njcn.jbsyncdata.enums.MeasTypeEnum;
import com.njcn.jbsyncdata.pojo.ExcelData;
import com.njcn.jbsyncdata.pojo.result.TokenResult;
import com.njcn.jbsyncdata.pojo.result.*;
import com.njcn.jbsyncdata.service.IBusinessService;
import com.njcn.jbsyncdata.util.RestTemplateUtil;
import lombok.extern.slf4j.Slf4j;
@@ -12,10 +20,7 @@ import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -50,9 +55,6 @@ public class BusinessServiceImpl implements IBusinessService {
jsonObjectSub.set("termType", "");
jsonObjectSub.set("measPointIds", new ArrayList<>());
jsonObject.set("filter", jsonObjectSub);
if (excelData.getStageID().equalsIgnoreCase("1502760751")) {
log.error("组装后的json为{}", jsonObject);
}
//组装好json开始发送请求
Map<String, String> headers = new HashMap<>();
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
@@ -81,23 +83,93 @@ public class BusinessServiceImpl implements IBusinessService {
List<String> userId = Stream.of("160".concat(excelData.getGenerationUserID())).collect(Collectors.toList());
jsonObjectSub.set("consNos", userId);
jsonObjectSub.set("consType", 3);
jsonObjectSub.set("astIds", new ArrayList<>());
jsonObjectSub.set("astType", "");
jsonObjectSub.set("psrIds", new ArrayList<>());
jsonObjectSub.set("psrType", "");
jsonObjectSub.set("measPointIds", new ArrayList<>());
List<String> typeList = Stream.of("PhV_phsA","PhV_phsB","PhV_phsC").collect(Collectors.toList());
//指标类型集合
List<String> typeList = MeasTypeEnum.getMeasList();
jsonObjectSub.set("telemetryTypes", typeList);
jsonObject.set("filter", jsonObjectSub);
if (excelData.getStageID().equalsIgnoreCase("2226675026")) {
log.error("组装后的json为{}", jsonObject);
}
//组装好json开始发送请求
Map<String, String> headers = new HashMap<>();
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
ResponseEntity<String> response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/measPoint/commonQuery"), headers, jsonObject, String.class);
ResponseEntity<GeneralResult> response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, GeneralResult.class);
if (response.getStatusCodeValue() == 200 && response.getBody().getStatus().equalsIgnoreCase("000000")) {
PageResult result = response.getBody().getResult();
List<CommonTelemetry> records = result.getRecords();
if (Objects.isNull(result) || CollectionUtil.isEmpty(result.getRecords())) {
//日志输出:
log.error("用户编号为:{},无遥测数据;", excelData.getGenerationUserID());
continue;
}
//处理各个record的数据因用户下可能有多个测量点按指标循环默认采用第一个匹配上的做数据处理
Map</*表名*/String, List<Map</*属性名*/String,/*数值*/String>>> typeData = new HashMap<>();
for (String type : typeList) {
MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(type);
for (CommonTelemetry commonTelemetry : records) {
if (type.equalsIgnoreCase(commonTelemetry.getMeasTypeCode())) {
List<StatisticsData> statisticsDataList = commonTelemetry.getTelemetryValue();
List<Map</*属性名*/String,/*数值*/String>> influxData = new ArrayList<>();
for (StatisticsData statisticsData : statisticsDataList) {
Map<String, String> tempInfluxData = new HashMap<>();
tempInfluxData.put("phasic_type", measTypeEnumByMeasType.getPhaseType());
tempInfluxData.put("line_id", "160".concat(excelData.getGenerationUserID()));
tempInfluxData.put("quality_flag", "0");
tempInfluxData.put("value_type", "AVG");
tempInfluxData.put("time", statisticsData.getDataTime());
//为空则赋值为0表中其他均为0
tempInfluxData.put(measTypeEnumByMeasType.getFieldName(), StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue());
influxData.add(tempInfluxData);
}
//measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData.put(measTypeEnumByMeasType.getMeasType().concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData);
break;
}
}
}
}
log.error("请求接口,台区号为:{},结果为:{}", excelData.getStageID(), response);
}
}
public static void main(String[] args) {
String path = "C:\\Users\\83944\\Desktop\\test\\test.txt";
FileReader fileReader = new FileReader(path);
String jsonStr = fileReader.readString();
GeneralResult result = JSONUtil.toBean(jsonStr, GeneralResult.class);
List<CommonTelemetry> records = result.getResult().getRecords();
//处理各个record的数据因用户下可能有多个测量点按指标循环默认采用第一个匹配上的做数据处理
Map</*表名*/String, List<Map</*属性名*/String,/*数值*/String>>> typeData = new HashMap<>();
List<String> typeList = Stream.of("PhV_phsA", "PhV_phsB", "PhV_phsC").collect(Collectors.toList());
for (String type : typeList) {
MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(type);
for (CommonTelemetry commonTelemetry : records) {
if (type.equalsIgnoreCase(commonTelemetry.getMeasTypeCode())) {
List<StatisticsData> statisticsDataList = commonTelemetry.getTelemetryValue();
List<Map</*属性名*/String,/*数值*/String>> influxData = new ArrayList<>();
for (StatisticsData statisticsData : statisticsDataList) {
Map<String, String> tempInfluxData = new HashMap<>();
tempInfluxData.put("phasic_type", measTypeEnumByMeasType.getPhaseType());
tempInfluxData.put("line_id", "1602514341899");
tempInfluxData.put("quality_flag", "0");
tempInfluxData.put("value_type", "AVG");
tempInfluxData.put("time", statisticsData.getDataTime());
//为空则赋值为0表中其他均为0
tempInfluxData.put(measTypeEnumByMeasType.getFieldName(), StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue());
influxData.add(tempInfluxData);
}
typeData.put(measTypeEnumByMeasType.getMeasType().concat("@").concat(measTypeEnumByMeasType.getTableName()), influxData);
break;
}
}
}
System.out.println(11);
}
}