This commit is contained in:
2023-10-19 10:06:06 +08:00
parent 6fe0df7933
commit 51c7dbede7
6 changed files with 263 additions and 267 deletions

View File

@@ -1,11 +1,13 @@
package com.njcn.jbsyncdata.controller;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.alibaba.excel.EasyExcel;
import com.njcn.jbsyncdata.enums.MeasTypeEnum;
import com.alibaba.excel.support.ExcelTypeEnum;
import com.njcn.jbsyncdata.pojo.DisPhotovoltaic10Excel;
import com.njcn.jbsyncdata.pojo.DisPhotovoltaic380Excel;
import com.njcn.jbsyncdata.pojo.ExcelData;
import com.njcn.jbsyncdata.service.DisPhotovoltaicService;
import com.njcn.jbsyncdata.service.IBusinessService;
import com.njcn.jbsyncdata.util.StreamUtil;
@@ -20,10 +22,8 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -43,9 +43,9 @@ public class DisPhotovoltaicController {
@ApiOperation(value = "查询所有用户的遥测数据")
@PostMapping("/queryTelemetryData")
public void queryTelemetryData(String startTime, String endTime) {
public void queryTelemetryData(String date) {
try {
businessService.queryTelemetryData(startTime, endTime);
businessService.queryTelemetryData(date);
} catch (Exception exception) {
exception.printStackTrace();
}
@@ -84,6 +84,107 @@ public class DisPhotovoltaicController {
}
public static void main(String[] args) throws IOException {
//读取10kV的数据
List<DisPhotovoltaic10Excel> list10kV = EasyExcel.read("D:\\temp\\基础表.xlsx")
.excelType(ExcelTypeEnum.XLSX)
.head(DisPhotovoltaic10Excel.class)
.headRowNumber(2)
.sheet(0).doReadSync();
list10kV = list10kV.stream()
.filter(t -> StrUtil.isNotBlank(t.getGenerationUserID()))
.filter(t -> StrUtil.isNotBlank(t.getStageID()))
.filter(StreamUtil.distinctByKey(DisPhotovoltaic10Excel::getGenerationUserID))
.collect(Collectors.toList());
//读取380V的数据
List<DisPhotovoltaic380Excel> list380v = EasyExcel.read("D:\\temp\\基础表.xlsx")
.excelType(ExcelTypeEnum.XLSX)
.head(DisPhotovoltaic380Excel.class)
.headRowNumber(2)
.sheet(1).doReadSync();
list380v = list380v.stream()
.filter(t -> StrUtil.isNotBlank(t.getGenerationUserID()))
.filter(t -> StrUtil.isNotBlank(t.getStageID()))
.filter(StreamUtil.distinctByKey(DisPhotovoltaic380Excel::getGenerationUserID))
.collect(Collectors.toList());
//读取所有没数据的用户号
FileReader fileReader = new FileReader("D:\\temp\\all.txt");
List<String> noData = fileReader.readLines();
Set<String> noDataSet = new HashSet<>(noData);
FileReader fileReader2 = new FileReader("D:\\temp\\part.txt");
List<String> noPartData = fileReader2.readLines();
Set<String> noPartDataSet = new HashSet<>(noPartData);
long millis = System.currentTimeMillis();
//梳理10kV全部没有数据的
List<DisPhotovoltaic10Excel> collect = list10kV.stream().filter(t -> noDataSet.contains(t.getGenerationUserID())).collect(Collectors.toList());
EasyExcel.write("D:\\temp\\10kV全部没有数据的.xlsx", DisPhotovoltaic10Excel.class).sheet("10kV全部没有数据的").doWrite(collect);
long millis1 = System.currentTimeMillis();
System.out.println("10kV全部没有数据的耗时" + (millis1 - millis));
//10kV部分没有数据的
Map<String, List<String>> noPartDataMap = noPartDataSet.stream().collect(Collectors.groupingBy(t -> t.substring(0, t.indexOf(StrPool.COMMA))));
Map<String, List<DisPhotovoltaic10Excel>> all10kVMap = list10kV.stream().collect(Collectors.groupingBy(DisPhotovoltaic10Excel::getGenerationUserID));
Set<String> keyedSet1 = all10kVMap.keySet();
List<DisPhotovoltaic10Excel> final10kVUserData = new LinkedList<>();
String info;
for (String userId : keyedSet1) {
List<String> infoList = noPartDataMap.get(userId);
if (CollectionUtil.isNotEmpty(infoList)) {
info = infoList.get(0);
DisPhotovoltaic10Excel disPhotovoltaic10Excel = all10kVMap.get(userId).get(0);
info = info.substring(info.indexOf(StrPool.COMMA) + 1);
info = info.replaceAll(StrPool.AT, "||")
.replaceAll("A_phsA", "A相电流")
.replaceAll("A_phsB", "B相电流")
.replaceAll("A_phsC", "C相电流")
.replaceAll("PhV_phsA", "A相电压")
.replaceAll("PhV_phsB", "B相电压")
.replaceAll("PhV_phsC", "C相电压")
.replaceAll("TotW", "有功")
.replaceAll("TotVar", "无功");
disPhotovoltaic10Excel.setTypes(info);
final10kVUserData.add(disPhotovoltaic10Excel);
}
}
EasyExcel.write("D:\\temp\\10kV部分没有数据的.xlsx", DisPhotovoltaic10Excel.class).sheet("10kV部分没有数据的").doWrite(final10kVUserData);
long millis2 = System.currentTimeMillis();
System.out.println("10kV部分没有数据的" + (millis2 - millis1));
//梳理380V全部没有数据的
List<DisPhotovoltaic380Excel> collect4 = list380v.stream().filter(t -> noDataSet.contains(t.getGenerationUserID())).collect(Collectors.toList());
EasyExcel.write("D:\\temp\\380V全部没有数据的.xlsx", DisPhotovoltaic380Excel.class).sheet("380V全部没有数据的").doWrite(collect4);
long millis3 = System.currentTimeMillis();
System.out.println("380V全部没有数据的" + (millis3 - millis2));
//380V部分没有数据的
Map<String, List<DisPhotovoltaic380Excel>> all380VMap = list380v.stream().collect(Collectors.groupingBy(DisPhotovoltaic380Excel::getGenerationUserID));
Set<String> keyedSet = noPartDataMap.keySet();
List<DisPhotovoltaic380Excel> final380VUserData = new LinkedList<>();
List<DisPhotovoltaic380Excel> disPhotovoltaic380ExcelList;
DisPhotovoltaic380Excel disPhotovoltaic380Excel;
for (String userId : keyedSet) {
disPhotovoltaic380ExcelList = all380VMap.get(userId);
if (CollectionUtil.isNotEmpty(disPhotovoltaic380ExcelList)) {
disPhotovoltaic380Excel = disPhotovoltaic380ExcelList.get(0);
info = noPartDataMap.get(userId).get(0);
info = info.substring(info.indexOf(StrPool.COMMA) + 1);
info = info.replaceAll(StrPool.AT, "||")
.replaceAll("A_phsA", "A相电流")
.replaceAll("A_phsB", "B相电流")
.replaceAll("A_phsC", "C相电流")
.replaceAll("PhV_phsA", "A相电压")
.replaceAll("PhV_phsB", "B相电压")
.replaceAll("PhV_phsC", "C相电压")
.replaceAll("TotW", "有功")
.replaceAll("TotVar", "无功");
disPhotovoltaic380Excel.setTypes(info);
final380VUserData.add(disPhotovoltaic380Excel);
}
}
EasyExcel.write("D:\\temp\\380V部分没有数据的.xlsx", DisPhotovoltaic380Excel.class).sheet("380V部分没有数据的").doWrite(final380VUserData);
long millis4 = System.currentTimeMillis();
System.out.println("380V部分没有数据的" + (millis4 - millis3));
}
@ApiOperation(value = "将用户数据导入到配网表中")
@PostMapping("/insertDistributionMonitor")
public String import380KV() throws Exception {

View File

@@ -78,5 +78,8 @@ public class DisPhotovoltaic10Excel implements Serializable {
@ExcelProperty(value = "备注")
private String remark;
@ExcelProperty(value = "无数据的指标")
private String types;
}

View File

@@ -6,6 +6,7 @@ import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Objects;
/**
* @author wr
@@ -81,5 +82,19 @@ public class DisPhotovoltaic380Excel implements Serializable {
@ExcelProperty(value = "所属线路PMS名称")
private String lineName;
@ExcelProperty(value = "无数据的指标")
private String types;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DisPhotovoltaic380Excel that = (DisPhotovoltaic380Excel) o;
return Objects.equals(errorMessage, that.errorMessage) && Objects.equals(serialNumber, that.serialNumber) && Objects.equals(orgName, that.orgName) && Objects.equals(county, that.county) && Objects.equals(powerSupply, that.powerSupply) && Objects.equals(generationUserID, that.generationUserID) && Objects.equals(generationUserName, that.generationUserName) && Objects.equals(connectionDate, that.connectionDate) && Objects.equals(address, that.address) && Objects.equals(generationUserType, that.generationUserType) && Objects.equals(wayConsumption, that.wayConsumption) && Objects.equals(contractCapacity, that.contractCapacity) && Objects.equals(Voltage_Level, that.Voltage_Level) && Objects.equals(industryType, that.industryType) && Objects.equals(stageID, that.stageID) && Objects.equals(stageName, that.stageName) && Objects.equals(transformerPMSID, that.transformerPMSID) && Objects.equals(isFusionTerminal, that.isFusionTerminal) && Objects.equals(isAntiarc, that.isAntiarc) && Objects.equals(lineID, that.lineID) && Objects.equals(lineName, that.lineName) && Objects.equals(types, that.types);
}
@Override
public int hashCode() {
return Objects.hash(errorMessage, serialNumber, orgName, county, powerSupply, generationUserID, generationUserName, connectionDate, address, generationUserType, wayConsumption, contractCapacity, Voltage_Level, industryType, stageID, stageName, transformerPMSID, isFusionTerminal, isAntiarc, lineID, lineName, types);
}
}

View File

@@ -1,11 +1,8 @@
package com.njcn.jbsyncdata.service;
import com.njcn.jbsyncdata.pojo.ExcelData;
import java.util.List;
public interface IBusinessService {
void queryTelemetryData(String startTime,String endTime);
void queryTelemetryData(String date);
}

View File

@@ -2,6 +2,7 @@ package com.njcn.jbsyncdata.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.file.FileWriter;
import cn.hutool.core.map.MapUtil;
@@ -63,132 +64,11 @@ public class BusinessServiceImpl implements IBusinessService {
* 3. CommonTelemetry的遥测数据集合telemetryValue为null--不做处理,直接过
* 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0
*/
public void queryTelemetryData2(String startTime, String endTime) {
// 连指标都没有返回的用户
// List<String> noDataUser = new ArrayList<>();
//有指标返回但是指标的遥测整体数据为null
List<String> noDataUserWithMeasType = new ArrayList<>();
RestTemplateUtil restTemplateUtil = new RestTemplateUtil();
TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate();
if (null == tokenWithRestTemplate) {
log.error("token信息获取失败");
return;
}
List<String> typeList = MeasTypeEnum.getMeasList();
JSONObject jsonObject = JSONUtil.createObj();
JSONObject jsonObjectSub = JSONUtil.createObj();
jsonObject.set("page", 1);
jsonObject.set("perPage", 10000);
jsonObject.set("startTime", "2023-10-07 00:00:00");
jsonObject.set("endTime", "2023-10-07 11:59:59");
//1公专变2低压用户3光伏
jsonObjectSub.set("consType", "3");
jsonObjectSub.set("astIds", new ArrayList<>());
jsonObjectSub.set("astType", "");
jsonObjectSub.set("psrIds", new ArrayList<>());
jsonObjectSub.set("psrType", "");
jsonObjectSub.set("measPointIds", new ArrayList<>());
jsonObjectSub.set("telemetryTypes", typeList);
//组装好json开始发送请求
Map<String, String> headers = new HashMap<>();
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
//获取所有发电用户的id
List<String> userIds = pmsPowerGenerationUserService.queryAllUserId();
//将发电用户编号按100尺寸分片
List<List<String>> partitionList = ListUtils.partition(userIds, 100);
log.error("总计分了{}片", partitionList.size());
List<String> userIdConcatMeasType;
int count = 0;
//指标类型集合
List</*各值以逗号分隔*/String> influxData;
Map</*表名*/String, List</*各值以逗号分隔*/String>> typeData = new HashMap<>();
StringBuilder tempInfluxData;
ResponseEntity<GeneralResult> response;
List<StatisticsData> statisticsDataList;
PageResult result;
List<CommonTelemetry> records;
String dataIdentify;
MeasTypeEnum measTypeEnumByMeasType;
for (List<String> generationUserIDList : partitionList) {
count++;
log.error("查询第{}片数据", count);
//按批次处理用户编号数据
jsonObjectSub.set("consNos", generationUserIDList);
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
userIdConcatMeasType = new ArrayList<>();
for (String measType : typeList) {
userIdConcatMeasType.addAll(generationUserIDList.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toList()));
}
JSONArray jsonArray = JSONUtil.createArray();
jsonArray.add(jsonObjectSub);
jsonObject.set("filters", jsonArray);
response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, GeneralResult.class);
if (response.getStatusCodeValue() == 200 && Objects.nonNull(response.getBody()) && response.getBody().getStatus().equalsIgnoreCase("000000")) {
result = response.getBody().getResult();
records = result.getRecords();
log.error("查询遥测数据结束,返回数据量:{}", records.size());
if (Objects.isNull(result) || CollectionUtil.isEmpty(records)) {
//日志输出:
log.error("起始时间:{},截止时间:{},无遥测数据;", startTime, endTime);
continue;
}
//处理各个record的数据因用户下可能有多个测量点按指标循环默认采用第一个匹配上的做数据处理
for (CommonTelemetry commonTelemetry : records) { // 最多循环100*16次
dataIdentify = commonTelemetry.getConsNo().concat(StrPool.AT).concat(commonTelemetry.getMeasTypeCode());
if (userIdConcatMeasType.contains(dataIdentify)) {
//首个包含该标识的数据进行处理
measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.getMeasTypeCode());
//统计数据经过测试接口响应json可能不包含该属性
statisticsDataList = commonTelemetry.getTelemetryValue();
if (CollectionUtil.isEmpty(statisticsDataList)) {
//添加进有指标但无遥测数据集合
noDataUserWithMeasType.add(dataIdentify);
continue;
}
influxData = new ArrayList<>();
for (StatisticsData statisticsData : statisticsDataList) { // 匹配上进入循环96次
tempInfluxData = new StringBuilder();
tempInfluxData.append(commonTelemetry.getMeasTypeCode())
.append(StrPool.COMMA)
.append(commonTelemetry.getConsNo())
.append(StrPool.COMMA)
.append(statisticsData.getDataTime())
.append(StrPool.COMMA)
.append(measTypeEnumByMeasType.getFieldName())
.append(StrPool.COMMA)
.append(StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue());
influxData.add(tempInfluxData.toString());
}
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData.put(commonTelemetry.getConsNo().concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData);
//处理完,删除该条记录,减少集合尺寸,提高效率
userIdConcatMeasType.remove(dataIdentify);
}
}
//没有匹配上的就是该用户没有数据
log.error("剩余有{}条标识", userIdConcatMeasType.size());
// noDataUser.addAll(userIdConcatMeasType);
} else {
log.error("查询遥测数据失败!第{}片,结果为:{}", count, response);
}
// System.gc();
}
//最后批量入库
batchInsertData(typeData);
//最后输出没有数据的用户编号
// if (CollectionUtil.isNotEmpty(noDataUser)) {
// noDataUser = noDataUser.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1))
// .distinct()
// .collect(Collectors.toList());
// FileWriter writer = FileWriter.create(new File("/usr/local/demo.txt"));
// writer.writeLines(noDataUser);
// }
// log.error("用户没有数据的长度为:{}", noDataUser.size());
log.error("用户有指标没有数据的长度为:{}", noDataUserWithMeasType.size());
}
@Override
public void queryTelemetryData(String startTime, String endTime) {
public void queryTelemetryData(String date) {
DateTime dateTemp = DateUtil.parse(date, DatePattern.NORM_DATE_FORMAT);
DateTime beginOfDay = DateUtil.beginOfDay(dateTemp);
DateTime endOfDay = DateUtil.endOfDay(dateTemp);
//有指标返回但是指标的遥测整体数据为null
RestTemplateUtil restTemplateUtil = new RestTemplateUtil();
TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate();
@@ -201,8 +81,8 @@ public class BusinessServiceImpl implements IBusinessService {
JSONObject jsonObjectSub = JSONUtil.createObj();
jsonObject.set("page", 1);
jsonObject.set("perPage", 10000);
jsonObject.set("startTime", "2023-10-07 00:00:00");
jsonObject.set("endTime", "2023-10-07 11:59:59");
jsonObject.set("startTime", DateUtil.format(beginOfDay, DatePattern.NORM_DATETIME_FORMATTER));
jsonObject.set("endTime", DateUtil.format(endOfDay, DatePattern.NORM_DATETIME_FORMATTER));
//1公专变2低压用户3光伏
jsonObjectSub.set("consType", "3");
jsonObjectSub.set("astIds", new ArrayList<>());
@@ -216,135 +96,135 @@ public class BusinessServiceImpl implements IBusinessService {
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
//获取所有发电用户的id
List<String> userIds = pmsPowerGenerationUserService.queryAllUserId();
//将发电用户编号按100尺寸分片
List<List<String>> partitionList = ListUtils.partition(userIds, 100);
log.error("总计分了{}片", partitionList.size());
int count = 0;
//先获取数据
List<ResponseEntity<String>> responseEntities = new ArrayList<>(2000);
for (List<String> generationUserIDList : partitionList) {
count++;
log.error("查询第{}片数据", count);
//按批次处理用户编号数据
jsonObjectSub.set("consNos", generationUserIDList);
JSONArray jsonArray = JSONUtil.createArray();
jsonArray.add(jsonObjectSub);
jsonObject.set("filters", jsonArray);
//避免中途token失效
if (count % 800 == 0) {
tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate();
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
List<List<String>> singleQueryDataUserId = ListUtils.partition(userIds, 20000);
for (int k = 0; k < singleQueryDataUserId.size(); k++) {
//将发电用户编号按100尺寸分片
List<List<String>> partitionList = ListUtils.partition(singleQueryDataUserId.get(k), 100);
log.error("总计分了{}片", partitionList.size());
int count = 0;
tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate();
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
//先获取数据
List<ResponseEntity<String>> responseEntities = new ArrayList<>(2000);
int kk = k + 1;
for (List<String> generationUserIDList : partitionList) {
count++;
log.error("查询第{}大片,{}小片数据", kk, count);
//按批次处理用户编号数据
jsonObjectSub.set("consNos", generationUserIDList);
JSONArray jsonArray = JSONUtil.createArray();
jsonArray.add(jsonObjectSub);
jsonObject.set("filters", jsonArray);
responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class));
}
responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class));
}
//开始解析数据
Set<String> userIdConcatMeasType = new HashSet<>();
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
for (String measType : typeList) {
userIdConcatMeasType.addAll(userIds.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet()));
}
List</*各值以逗号分隔*/String> influxData;
Map</*表名*/String, List</*各值以逗号分隔*/String>> typeData = new HashMap<>();
StringBuilder tempInfluxData;
ResponseEntity<String> response;
JSONArray statisticsDataList;
JSONObject result;
JSONObject statisticsData;
JSONObject body;
JSONArray records;
String dataIdentify;
JSONObject commonTelemetry;
MeasTypeEnum measTypeEnumByMeasType;
for (int i = 0; i < partitionList.size(); i++) {
log.error("解析第{}片数据", i);
response = responseEntities.get(i);
body = JSONUtil.parseObj(response.getBody());
if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) {
result = JSONUtil.parseObj(body.get("result", String.class));
records = JSONUtil.parseArray(result.get("records", String.class));
log.error("查询遥测数据结束,返回数据量:{}", records.size());
if (CollectionUtil.isEmpty(records)) {
//日志输出:
log.error("起始时间:{},截止时间:{},无遥测数据;", startTime, endTime);
continue;
//开始解析数据
Set<String> userIdConcatMeasType = new HashSet<>();
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
for (String measType : typeList) {
userIdConcatMeasType.addAll(singleQueryDataUserId.get(k).stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet()));
}
List</*各值以逗号分隔*/String> influxData;
Map</*表名*/String, List</*各值以逗号分隔*/String>> typeData = new HashMap<>();
StringBuilder tempInfluxData;
ResponseEntity<String> response;
JSONArray statisticsDataList;
JSONObject result;
JSONObject statisticsData;
JSONObject body;
JSONArray records;
String dataIdentify;
JSONObject commonTelemetry;
MeasTypeEnum measTypeEnumByMeasType;
for (int i = 0; i < partitionList.size(); i++) {
log.error("解析第{}片数据", i);
response = responseEntities.get(i);
body = JSONUtil.parseObj(response.getBody());
if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) {
result = JSONUtil.parseObj(body.get("result", String.class));
records = JSONUtil.parseArray(result.get("records", String.class));
log.error("查询遥测数据结束,返回数据量:{}", records.size());
if (CollectionUtil.isEmpty(records)) {
//日志输出:
log.error("查询时间:{},无遥测数据;", date);
continue;
}
//处理各个record的数据因用户下可能有多个测量点按指标循环默认采用第一个匹配上的做数据处理
for (Object obj : records) { // 最多循环100*16次
commonTelemetry = JSONUtil.parseObj(obj);
dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class));
if (userIdConcatMeasType.contains(dataIdentify)) {
//首个包含该标识的数据进行处理
measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class));
//统计数据经过测试接口响应json可能不包含该属性
statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class);
if (CollectionUtil.isEmpty(statisticsDataList)) {
//添加进有指标但无遥测数据集合
continue;
}
influxData = new ArrayList<>();
for (Object subObj : statisticsDataList) { // 匹配上进入循环96次
statisticsData = JSONUtil.parseObj(subObj);
tempInfluxData = new StringBuilder();
tempInfluxData.append(measTypeEnumByMeasType.getPhaseType())
.append(StrPool.COMMA)
.append(commonTelemetry.get("consNo", String.class))
.append(StrPool.COMMA)
.append(statisticsData.get("dataTime", String.class))
.append(StrPool.COMMA)
.append(measTypeEnumByMeasType.getFieldName())
.append(StrPool.COMMA)
.append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class));
influxData.add(tempInfluxData.toString());
}
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData);
//处理完,删除该条记录,减少集合尺寸,提高效率
userIdConcatMeasType.remove(dataIdentify);
}
}
//没有匹配上的就是该用户没有数据
log.error("剩余有{}条标识", userIdConcatMeasType.size());
} else {
log.error("查询遥测数据失败!第{}片,结果为:{}", count, response);
}
//处理各个record的数据因用户下可能有多个测量点按指标循环默认采用第一个匹配上的做数据处理
for (Object obj : records) { // 最多循环100*16次
commonTelemetry = JSONUtil.parseObj(obj);
dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class));
if (userIdConcatMeasType.contains(dataIdentify)) {
//首个包含该标识的数据进行处理
measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class));
//统计数据经过测试接口响应json可能不包含该属性
statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class);
if (CollectionUtil.isEmpty(statisticsDataList)) {
//添加进有指标但无遥测数据集合
continue;
}
influxData = new ArrayList<>();
for (Object subObj : statisticsDataList) { // 匹配上进入循环96次
statisticsData = JSONUtil.parseObj(subObj);
tempInfluxData = new StringBuilder();
tempInfluxData.append(measTypeEnumByMeasType.getPhaseType())
.append(StrPool.COMMA)
.append(commonTelemetry.get("consNo", String.class))
.append(StrPool.COMMA)
.append(statisticsData.get("dataTime", String.class))
.append(StrPool.COMMA)
.append(measTypeEnumByMeasType.getFieldName())
.append(StrPool.COMMA)
.append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class));
influxData.add(tempInfluxData.toString());
}
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData);
//处理完,删除该条记录,减少集合尺寸,提高效率
userIdConcatMeasType.remove(dataIdentify);
}
//最后输出没有数据的用户编号
/**
* 输出到2个文件lackData.txt、 excalationData.txt
* 注用户号去除160前缀
* 1、所有指标均没有有数据的用户编号
* 2、部分指标没有数据的用户编号并表明是哪些指标
*/
if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) {
Map<String, List<String>> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> {
String key = str.substring(3);
key = key.substring(0, key.indexOf(StrPool.AT));
return key;
}));
//全部缺失数据的用户
List<String> lackData = new ArrayList<>();
//部分缺失的用户及指标
List<String> excalationData = new ArrayList<>();
Set<String> keyedSet = finalMap.keySet();
for (String key : keyedSet) {
List<String> data = finalMap.get(key);
if (data.size() == typeList.size()) {
lackData.add(key);
} else {
data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList());
key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT));
excalationData.add(key);
}
}
//没有匹配上的就是该用户没有数据
log.error("剩余有{}条标识", userIdConcatMeasType.size());
} else {
log.error("查询遥测数据失败!第{}片,结果为:{}", count, response);
FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/syncData/lackData" + date + k + ".txt"));
lackDataWriter.writeLines(lackData);
FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/syncData/excalationData" + date + k + ".txt"));
excalationDataWriter.writeLines(excalationData);
}
log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size());
//最后批量入库
batchInsertData(typeData);
}
//最后输出没有数据的用户编号
/**
* 输出到2个文件lackData.txt、 excalationData.txt
* 注用户号去除160前缀
* 1、所有指标均没有有数据的用户编号
* 2、部分指标没有数据的用户编号并表明是哪些指标
*/
if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) {
Map<String, List<String>> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> {
String key = str.substring(3);
key = key.substring(0, key.indexOf(StrPool.AT));
return key;
}));
//全部缺失数据的用户
List<String> lackData = new ArrayList<>();
//部分缺失的用户及指标
List<String> excalationData = new ArrayList<>();
Set<String> keyedSet = finalMap.keySet();
for (String key : keyedSet) {
List<String> data = finalMap.get(key);
if (data.size() == typeList.size()) {
lackData.add(key);
} else {
data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList());
key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT));
excalationData.add(key);
}
}
FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/lackData.txt"));
lackDataWriter.writeLines(lackData);
FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/excalationData.txt"));
excalationDataWriter.writeLines(excalationData);
}
log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size());
//最后批量入库
batchInsertData(typeData);
}
@@ -383,11 +263,11 @@ public class BusinessServiceImpl implements IBusinessService {
sqlList.add(batchPoints.lineProtocol());
}
}
List<List<String>> subSqlList = ListUtils.partition(sqlList, 50000);
List<List<String>> subSqlList = ListUtils.partition(sqlList, 20000);
int count = 1;
for (List<String> sql : subSqlList) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql);
log.error("已经入库{}条记录!", count * 50000);
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "autogen", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql);
log.error("已经入库{}条记录!", count * 20000);
count++;
}

View File

@@ -42,10 +42,10 @@ spring:
max-file-size: 500MB
enabled: true
influx:
url: http://10.118.135.128:8086
url: http://10.118.135.129:8086
user: admin
password: 123456
database: ym_testbk
database: pqsbase
mapper-location: com.njcn.jbsyncdata.imapper