diff --git a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java index c82619d..54c567c 100644 --- a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java +++ b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java @@ -1,11 +1,13 @@ package com.njcn.jbsyncdata.controller; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.io.file.FileReader; +import cn.hutool.core.text.StrPool; import cn.hutool.core.util.StrUtil; import com.alibaba.excel.EasyExcel; -import com.njcn.jbsyncdata.enums.MeasTypeEnum; +import com.alibaba.excel.support.ExcelTypeEnum; import com.njcn.jbsyncdata.pojo.DisPhotovoltaic10Excel; import com.njcn.jbsyncdata.pojo.DisPhotovoltaic380Excel; -import com.njcn.jbsyncdata.pojo.ExcelData; import com.njcn.jbsyncdata.service.DisPhotovoltaicService; import com.njcn.jbsyncdata.service.IBusinessService; import com.njcn.jbsyncdata.util.StreamUtil; @@ -20,10 +22,8 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletResponse; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.io.IOException; +import java.util.*; import java.util.stream.Collectors; /** @@ -43,9 +43,9 @@ public class DisPhotovoltaicController { @ApiOperation(value = "查询所有用户的遥测数据") @PostMapping("/queryTelemetryData") - public void queryTelemetryData(String startTime, String endTime) { + public void queryTelemetryData(String date) { try { - businessService.queryTelemetryData(startTime, endTime); + businessService.queryTelemetryData(date); } catch (Exception exception) { exception.printStackTrace(); } @@ -84,6 +84,107 @@ public class DisPhotovoltaicController { } + public static void main(String[] args) throws IOException { + //读取10kV的数据 + List list10kV = EasyExcel.read("D:\\temp\\基础表.xlsx") + .excelType(ExcelTypeEnum.XLSX) + .head(DisPhotovoltaic10Excel.class) + .headRowNumber(2) + .sheet(0).doReadSync(); + list10kV = list10kV.stream() + .filter(t -> StrUtil.isNotBlank(t.getGenerationUserID())) + .filter(t -> StrUtil.isNotBlank(t.getStageID())) + .filter(StreamUtil.distinctByKey(DisPhotovoltaic10Excel::getGenerationUserID)) + .collect(Collectors.toList()); + //读取380V的数据 + List list380v = EasyExcel.read("D:\\temp\\基础表.xlsx") + .excelType(ExcelTypeEnum.XLSX) + .head(DisPhotovoltaic380Excel.class) + .headRowNumber(2) + .sheet(1).doReadSync(); + list380v = list380v.stream() + .filter(t -> StrUtil.isNotBlank(t.getGenerationUserID())) + .filter(t -> StrUtil.isNotBlank(t.getStageID())) + .filter(StreamUtil.distinctByKey(DisPhotovoltaic380Excel::getGenerationUserID)) + .collect(Collectors.toList()); + //读取所有没数据的用户号 + + FileReader fileReader = new FileReader("D:\\temp\\all.txt"); + List noData = fileReader.readLines(); + Set noDataSet = new HashSet<>(noData); + FileReader fileReader2 = new FileReader("D:\\temp\\part.txt"); + List noPartData = fileReader2.readLines(); + Set noPartDataSet = new HashSet<>(noPartData); + + long millis = System.currentTimeMillis(); + //梳理10kV全部没有数据的 + List collect = list10kV.stream().filter(t -> noDataSet.contains(t.getGenerationUserID())).collect(Collectors.toList()); + EasyExcel.write("D:\\temp\\10kV全部没有数据的.xlsx", DisPhotovoltaic10Excel.class).sheet("10kV全部没有数据的").doWrite(collect); + long millis1 = System.currentTimeMillis(); + System.out.println("10kV全部没有数据的耗时:" + (millis1 - millis)); + //10kV部分没有数据的 + Map> noPartDataMap = noPartDataSet.stream().collect(Collectors.groupingBy(t -> t.substring(0, t.indexOf(StrPool.COMMA)))); + Map> all10kVMap = list10kV.stream().collect(Collectors.groupingBy(DisPhotovoltaic10Excel::getGenerationUserID)); + Set keyedSet1 = all10kVMap.keySet(); + List final10kVUserData = new LinkedList<>(); + String info; + for (String userId : keyedSet1) { + List infoList = noPartDataMap.get(userId); + if (CollectionUtil.isNotEmpty(infoList)) { + info = infoList.get(0); + DisPhotovoltaic10Excel disPhotovoltaic10Excel = all10kVMap.get(userId).get(0); + info = info.substring(info.indexOf(StrPool.COMMA) + 1); + info = info.replaceAll(StrPool.AT, "||") + .replaceAll("A_phsA", "A相电流") + .replaceAll("A_phsB", "B相电流") + .replaceAll("A_phsC", "C相电流") + .replaceAll("PhV_phsA", "A相电压") + .replaceAll("PhV_phsB", "B相电压") + .replaceAll("PhV_phsC", "C相电压") + .replaceAll("TotW", "有功") + .replaceAll("TotVar", "无功"); + disPhotovoltaic10Excel.setTypes(info); + final10kVUserData.add(disPhotovoltaic10Excel); + } + } + EasyExcel.write("D:\\temp\\10kV部分没有数据的.xlsx", DisPhotovoltaic10Excel.class).sheet("10kV部分没有数据的").doWrite(final10kVUserData); + long millis2 = System.currentTimeMillis(); + System.out.println("10kV部分没有数据的:" + (millis2 - millis1)); + //梳理380V全部没有数据的 + List collect4 = list380v.stream().filter(t -> noDataSet.contains(t.getGenerationUserID())).collect(Collectors.toList()); + EasyExcel.write("D:\\temp\\380V全部没有数据的.xlsx", DisPhotovoltaic380Excel.class).sheet("380V全部没有数据的").doWrite(collect4); + long millis3 = System.currentTimeMillis(); + System.out.println("380V全部没有数据的:" + (millis3 - millis2)); + //380V部分没有数据的 + Map> all380VMap = list380v.stream().collect(Collectors.groupingBy(DisPhotovoltaic380Excel::getGenerationUserID)); + Set keyedSet = noPartDataMap.keySet(); + List final380VUserData = new LinkedList<>(); + List disPhotovoltaic380ExcelList; + DisPhotovoltaic380Excel disPhotovoltaic380Excel; + for (String userId : keyedSet) { + disPhotovoltaic380ExcelList = all380VMap.get(userId); + if (CollectionUtil.isNotEmpty(disPhotovoltaic380ExcelList)) { + disPhotovoltaic380Excel = disPhotovoltaic380ExcelList.get(0); + info = noPartDataMap.get(userId).get(0); + info = info.substring(info.indexOf(StrPool.COMMA) + 1); + info = info.replaceAll(StrPool.AT, "||") + .replaceAll("A_phsA", "A相电流") + .replaceAll("A_phsB", "B相电流") + .replaceAll("A_phsC", "C相电流") + .replaceAll("PhV_phsA", "A相电压") + .replaceAll("PhV_phsB", "B相电压") + .replaceAll("PhV_phsC", "C相电压") + .replaceAll("TotW", "有功") + .replaceAll("TotVar", "无功"); + disPhotovoltaic380Excel.setTypes(info); + final380VUserData.add(disPhotovoltaic380Excel); + } + } + EasyExcel.write("D:\\temp\\380V部分没有数据的.xlsx", DisPhotovoltaic380Excel.class).sheet("380V部分没有数据的").doWrite(final380VUserData); + long millis4 = System.currentTimeMillis(); + System.out.println("380V部分没有数据的:" + (millis4 - millis3)); + } + @ApiOperation(value = "将用户数据导入到配网表中") @PostMapping("/insertDistributionMonitor") public String import380KV() throws Exception { diff --git a/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic10Excel.java b/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic10Excel.java index 45f04f2..0bf8b71 100644 --- a/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic10Excel.java +++ b/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic10Excel.java @@ -78,5 +78,8 @@ public class DisPhotovoltaic10Excel implements Serializable { @ExcelProperty(value = "备注") private String remark; + @ExcelProperty(value = "无数据的指标") + private String types; + } diff --git a/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic380Excel.java b/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic380Excel.java index f3571b9..2eeb26f 100644 --- a/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic380Excel.java +++ b/src/main/java/com/njcn/jbsyncdata/pojo/DisPhotovoltaic380Excel.java @@ -6,6 +6,7 @@ import lombok.Data; import java.io.Serializable; import java.math.BigDecimal; +import java.util.Objects; /** * @author wr @@ -81,5 +82,19 @@ public class DisPhotovoltaic380Excel implements Serializable { @ExcelProperty(value = "所属线路PMS名称") private String lineName; + @ExcelProperty(value = "无数据的指标") + private String types; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DisPhotovoltaic380Excel that = (DisPhotovoltaic380Excel) o; + return Objects.equals(errorMessage, that.errorMessage) && Objects.equals(serialNumber, that.serialNumber) && Objects.equals(orgName, that.orgName) && Objects.equals(county, that.county) && Objects.equals(powerSupply, that.powerSupply) && Objects.equals(generationUserID, that.generationUserID) && Objects.equals(generationUserName, that.generationUserName) && Objects.equals(connectionDate, that.connectionDate) && Objects.equals(address, that.address) && Objects.equals(generationUserType, that.generationUserType) && Objects.equals(wayConsumption, that.wayConsumption) && Objects.equals(contractCapacity, that.contractCapacity) && Objects.equals(Voltage_Level, that.Voltage_Level) && Objects.equals(industryType, that.industryType) && Objects.equals(stageID, that.stageID) && Objects.equals(stageName, that.stageName) && Objects.equals(transformerPMSID, that.transformerPMSID) && Objects.equals(isFusionTerminal, that.isFusionTerminal) && Objects.equals(isAntiarc, that.isAntiarc) && Objects.equals(lineID, that.lineID) && Objects.equals(lineName, that.lineName) && Objects.equals(types, that.types); + } + + @Override + public int hashCode() { + return Objects.hash(errorMessage, serialNumber, orgName, county, powerSupply, generationUserID, generationUserName, connectionDate, address, generationUserType, wayConsumption, contractCapacity, Voltage_Level, industryType, stageID, stageName, transformerPMSID, isFusionTerminal, isAntiarc, lineID, lineName, types); + } } diff --git a/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java b/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java index 8b0ad4a..b8d6fd7 100644 --- a/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java +++ b/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java @@ -1,11 +1,8 @@ package com.njcn.jbsyncdata.service; -import com.njcn.jbsyncdata.pojo.ExcelData; - -import java.util.List; public interface IBusinessService { - void queryTelemetryData(String startTime,String endTime); + void queryTelemetryData(String date); } diff --git a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java index e98e526..e428f67 100644 --- a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java +++ b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java @@ -2,6 +2,7 @@ package com.njcn.jbsyncdata.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import cn.hutool.core.io.file.FileWriter; import cn.hutool.core.map.MapUtil; @@ -63,132 +64,11 @@ public class BusinessServiceImpl implements IBusinessService { * 3. CommonTelemetry的遥测数据集合telemetryValue为null--不做处理,直接过 * 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0 */ - public void queryTelemetryData2(String startTime, String endTime) { -// 连指标都没有返回的用户 -// List noDataUser = new ArrayList<>(); - //有指标返回,但是指标的遥测整体数据为null - List noDataUserWithMeasType = new ArrayList<>(); - RestTemplateUtil restTemplateUtil = new RestTemplateUtil(); - TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); - if (null == tokenWithRestTemplate) { - log.error("token信息获取失败"); - return; - } - List typeList = MeasTypeEnum.getMeasList(); - JSONObject jsonObject = JSONUtil.createObj(); - JSONObject jsonObjectSub = JSONUtil.createObj(); - jsonObject.set("page", 1); - jsonObject.set("perPage", 10000); - jsonObject.set("startTime", "2023-10-07 00:00:00"); - jsonObject.set("endTime", "2023-10-07 11:59:59"); - //1公专变2低压用户3光伏 - jsonObjectSub.set("consType", "3"); - jsonObjectSub.set("astIds", new ArrayList<>()); - jsonObjectSub.set("astType", ""); - jsonObjectSub.set("psrIds", new ArrayList<>()); - jsonObjectSub.set("psrType", ""); - jsonObjectSub.set("measPointIds", new ArrayList<>()); - jsonObjectSub.set("telemetryTypes", typeList); - //组装好json开始发送请求 - Map headers = new HashMap<>(); - headers.put("x-token", tokenWithRestTemplate.getAccess_token()); - //获取所有发电用户的id - List userIds = pmsPowerGenerationUserService.queryAllUserId(); - //将发电用户编号按100尺寸分片 - List> partitionList = ListUtils.partition(userIds, 100); - log.error("总计分了{}片", partitionList.size()); - List userIdConcatMeasType; - int count = 0; - //指标类型集合 - List influxData; - Map> typeData = new HashMap<>(); - StringBuilder tempInfluxData; - ResponseEntity response; - List statisticsDataList; - PageResult result; - List records; - String dataIdentify; - MeasTypeEnum measTypeEnumByMeasType; - for (List generationUserIDList : partitionList) { - count++; - log.error("查询第{}片数据", count); - //按批次处理用户编号数据 - jsonObjectSub.set("consNos", generationUserIDList); - //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType - userIdConcatMeasType = new ArrayList<>(); - for (String measType : typeList) { - userIdConcatMeasType.addAll(generationUserIDList.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toList())); - } - JSONArray jsonArray = JSONUtil.createArray(); - jsonArray.add(jsonObjectSub); - jsonObject.set("filters", jsonArray); - response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, GeneralResult.class); - if (response.getStatusCodeValue() == 200 && Objects.nonNull(response.getBody()) && response.getBody().getStatus().equalsIgnoreCase("000000")) { - result = response.getBody().getResult(); - records = result.getRecords(); - log.error("查询遥测数据结束,返回数据量:{}", records.size()); - if (Objects.isNull(result) || CollectionUtil.isEmpty(records)) { - //日志输出: - log.error("起始时间:{},截止时间:{},无遥测数据;", startTime, endTime); - continue; - } - //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 - for (CommonTelemetry commonTelemetry : records) { // 最多循环100*16次 - dataIdentify = commonTelemetry.getConsNo().concat(StrPool.AT).concat(commonTelemetry.getMeasTypeCode()); - if (userIdConcatMeasType.contains(dataIdentify)) { - //首个包含该标识的数据进行处理 - measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.getMeasTypeCode()); - //统计数据,经过测试,接口响应json可能不包含该属性 - statisticsDataList = commonTelemetry.getTelemetryValue(); - if (CollectionUtil.isEmpty(statisticsDataList)) { - //添加进有指标但无遥测数据集合 - noDataUserWithMeasType.add(dataIdentify); - continue; - } - influxData = new ArrayList<>(); - for (StatisticsData statisticsData : statisticsDataList) { // 匹配上进入,循环96次 - tempInfluxData = new StringBuilder(); - tempInfluxData.append(commonTelemetry.getMeasTypeCode()) - .append(StrPool.COMMA) - .append(commonTelemetry.getConsNo()) - .append(StrPool.COMMA) - .append(statisticsData.getDataTime()) - .append(StrPool.COMMA) - .append(measTypeEnumByMeasType.getFieldName()) - .append(StrPool.COMMA) - .append(StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue()); - influxData.add(tempInfluxData.toString()); - } - //userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖; - typeData.put(commonTelemetry.getConsNo().concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); - //处理完,删除该条记录,减少集合尺寸,提高效率 - userIdConcatMeasType.remove(dataIdentify); - } - } - //没有匹配上的就是该用户没有数据 - log.error("剩余有{}条标识", userIdConcatMeasType.size()); -// noDataUser.addAll(userIdConcatMeasType); - } else { - log.error("查询遥测数据失败!第{}片,结果为:{}", count, response); - } -// System.gc(); - } - //最后批量入库 - batchInsertData(typeData); - //最后输出没有数据的用户编号 -// if (CollectionUtil.isNotEmpty(noDataUser)) { -// noDataUser = noDataUser.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)) -// .distinct() -// .collect(Collectors.toList()); -// FileWriter writer = FileWriter.create(new File("/usr/local/demo.txt")); -// writer.writeLines(noDataUser); -// } -// log.error("用户没有数据的长度为:{}", noDataUser.size()); - log.error("用户有指标没有数据的长度为:{}", noDataUserWithMeasType.size()); - } - @Override - public void queryTelemetryData(String startTime, String endTime) { + public void queryTelemetryData(String date) { + DateTime dateTemp = DateUtil.parse(date, DatePattern.NORM_DATE_FORMAT); + DateTime beginOfDay = DateUtil.beginOfDay(dateTemp); + DateTime endOfDay = DateUtil.endOfDay(dateTemp); //有指标返回,但是指标的遥测整体数据为null RestTemplateUtil restTemplateUtil = new RestTemplateUtil(); TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); @@ -201,8 +81,8 @@ public class BusinessServiceImpl implements IBusinessService { JSONObject jsonObjectSub = JSONUtil.createObj(); jsonObject.set("page", 1); jsonObject.set("perPage", 10000); - jsonObject.set("startTime", "2023-10-07 00:00:00"); - jsonObject.set("endTime", "2023-10-07 11:59:59"); + jsonObject.set("startTime", DateUtil.format(beginOfDay, DatePattern.NORM_DATETIME_FORMATTER)); + jsonObject.set("endTime", DateUtil.format(endOfDay, DatePattern.NORM_DATETIME_FORMATTER)); //1公专变2低压用户3光伏 jsonObjectSub.set("consType", "3"); jsonObjectSub.set("astIds", new ArrayList<>()); @@ -216,135 +96,135 @@ public class BusinessServiceImpl implements IBusinessService { headers.put("x-token", tokenWithRestTemplate.getAccess_token()); //获取所有发电用户的id List userIds = pmsPowerGenerationUserService.queryAllUserId(); - //将发电用户编号按100尺寸分片 - List> partitionList = ListUtils.partition(userIds, 100); - log.error("总计分了{}片", partitionList.size()); - - int count = 0; - //先获取数据 - List> responseEntities = new ArrayList<>(2000); - for (List generationUserIDList : partitionList) { - count++; - log.error("查询第{}片数据", count); - //按批次处理用户编号数据 - jsonObjectSub.set("consNos", generationUserIDList); - JSONArray jsonArray = JSONUtil.createArray(); - jsonArray.add(jsonObjectSub); - jsonObject.set("filters", jsonArray); - //避免中途token失效 - if (count % 800 == 0) { - tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); - headers.put("x-token", tokenWithRestTemplate.getAccess_token()); + List> singleQueryDataUserId = ListUtils.partition(userIds, 20000); + for (int k = 0; k < singleQueryDataUserId.size(); k++) { + //将发电用户编号按100尺寸分片 + List> partitionList = ListUtils.partition(singleQueryDataUserId.get(k), 100); + log.error("总计分了{}片", partitionList.size()); + int count = 0; + tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); + headers.put("x-token", tokenWithRestTemplate.getAccess_token()); + //先获取数据 + List> responseEntities = new ArrayList<>(2000); + int kk = k + 1; + for (List generationUserIDList : partitionList) { + count++; + log.error("查询第{}大片,{}小片数据", kk, count); + //按批次处理用户编号数据 + jsonObjectSub.set("consNos", generationUserIDList); + JSONArray jsonArray = JSONUtil.createArray(); + jsonArray.add(jsonObjectSub); + jsonObject.set("filters", jsonArray); + responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class)); } - responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class)); - } - //开始解析数据 - Set userIdConcatMeasType = new HashSet<>(); - //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType - for (String measType : typeList) { - userIdConcatMeasType.addAll(userIds.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet())); - } - List influxData; - Map> typeData = new HashMap<>(); - StringBuilder tempInfluxData; - ResponseEntity response; - JSONArray statisticsDataList; - JSONObject result; - JSONObject statisticsData; - JSONObject body; - JSONArray records; - String dataIdentify; - JSONObject commonTelemetry; - MeasTypeEnum measTypeEnumByMeasType; - for (int i = 0; i < partitionList.size(); i++) { - log.error("解析第{}片数据", i); - response = responseEntities.get(i); - body = JSONUtil.parseObj(response.getBody()); - if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) { - result = JSONUtil.parseObj(body.get("result", String.class)); - records = JSONUtil.parseArray(result.get("records", String.class)); - log.error("查询遥测数据结束,返回数据量:{}", records.size()); - if (CollectionUtil.isEmpty(records)) { - //日志输出: - log.error("起始时间:{},截止时间:{},无遥测数据;", startTime, endTime); - continue; + //开始解析数据 + Set userIdConcatMeasType = new HashSet<>(); + //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType + for (String measType : typeList) { + userIdConcatMeasType.addAll(singleQueryDataUserId.get(k).stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet())); + } + List influxData; + Map> typeData = new HashMap<>(); + StringBuilder tempInfluxData; + ResponseEntity response; + JSONArray statisticsDataList; + JSONObject result; + JSONObject statisticsData; + JSONObject body; + JSONArray records; + String dataIdentify; + JSONObject commonTelemetry; + MeasTypeEnum measTypeEnumByMeasType; + for (int i = 0; i < partitionList.size(); i++) { + log.error("解析第{}片数据", i); + response = responseEntities.get(i); + body = JSONUtil.parseObj(response.getBody()); + if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) { + result = JSONUtil.parseObj(body.get("result", String.class)); + records = JSONUtil.parseArray(result.get("records", String.class)); + log.error("查询遥测数据结束,返回数据量:{}", records.size()); + if (CollectionUtil.isEmpty(records)) { + //日志输出: + log.error("查询时间:{},无遥测数据;", date); + continue; + } + //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 + for (Object obj : records) { // 最多循环100*16次 + commonTelemetry = JSONUtil.parseObj(obj); + dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class)); + if (userIdConcatMeasType.contains(dataIdentify)) { + //首个包含该标识的数据进行处理 + measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class)); + //统计数据,经过测试,接口响应json可能不包含该属性 + statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class); + if (CollectionUtil.isEmpty(statisticsDataList)) { + //添加进有指标但无遥测数据集合 + continue; + } + influxData = new ArrayList<>(); + for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次 + statisticsData = JSONUtil.parseObj(subObj); + tempInfluxData = new StringBuilder(); + tempInfluxData.append(measTypeEnumByMeasType.getPhaseType()) + .append(StrPool.COMMA) + .append(commonTelemetry.get("consNo", String.class)) + .append(StrPool.COMMA) + .append(statisticsData.get("dataTime", String.class)) + .append(StrPool.COMMA) + .append(measTypeEnumByMeasType.getFieldName()) + .append(StrPool.COMMA) + .append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class)); + influxData.add(tempInfluxData.toString()); + } + //userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖; + typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); + //处理完,删除该条记录,减少集合尺寸,提高效率 + userIdConcatMeasType.remove(dataIdentify); + } + } + //没有匹配上的就是该用户没有数据 + log.error("剩余有{}条标识", userIdConcatMeasType.size()); + } else { + log.error("查询遥测数据失败!第{}片,结果为:{}", count, response); } - //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 - for (Object obj : records) { // 最多循环100*16次 - commonTelemetry = JSONUtil.parseObj(obj); - dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class)); - if (userIdConcatMeasType.contains(dataIdentify)) { - //首个包含该标识的数据进行处理 - measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class)); - //统计数据,经过测试,接口响应json可能不包含该属性 - statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class); - if (CollectionUtil.isEmpty(statisticsDataList)) { - //添加进有指标但无遥测数据集合 - continue; - } - influxData = new ArrayList<>(); - for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次 - statisticsData = JSONUtil.parseObj(subObj); - tempInfluxData = new StringBuilder(); - tempInfluxData.append(measTypeEnumByMeasType.getPhaseType()) - .append(StrPool.COMMA) - .append(commonTelemetry.get("consNo", String.class)) - .append(StrPool.COMMA) - .append(statisticsData.get("dataTime", String.class)) - .append(StrPool.COMMA) - .append(measTypeEnumByMeasType.getFieldName()) - .append(StrPool.COMMA) - .append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class)); - influxData.add(tempInfluxData.toString()); - } - //userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖; - typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); - //处理完,删除该条记录,减少集合尺寸,提高效率 - userIdConcatMeasType.remove(dataIdentify); + } + //最后输出没有数据的用户编号 + /** + * 输出到2个文件,lackData.txt、 excalationData.txt + * 注:用户号去除160前缀 + * 1、所有指标均没有有数据的用户编号 + * 2、部分指标没有数据的用户编号,并表明是哪些指标 + */ + if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) { + Map> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> { + String key = str.substring(3); + key = key.substring(0, key.indexOf(StrPool.AT)); + return key; + })); + //全部缺失数据的用户 + List lackData = new ArrayList<>(); + //部分缺失的用户及指标 + List excalationData = new ArrayList<>(); + Set keyedSet = finalMap.keySet(); + for (String key : keyedSet) { + List data = finalMap.get(key); + if (data.size() == typeList.size()) { + lackData.add(key); + } else { + data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList()); + key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT)); + excalationData.add(key); } } - //没有匹配上的就是该用户没有数据 - log.error("剩余有{}条标识", userIdConcatMeasType.size()); - } else { - log.error("查询遥测数据失败!第{}片,结果为:{}", count, response); + FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/syncData/lackData" + date + k + ".txt")); + lackDataWriter.writeLines(lackData); + FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/syncData/excalationData" + date + k + ".txt")); + excalationDataWriter.writeLines(excalationData); } + log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size()); + //最后批量入库 + batchInsertData(typeData); } - //最后输出没有数据的用户编号 - /** - * 输出到2个文件,lackData.txt、 excalationData.txt - * 注:用户号去除160前缀 - * 1、所有指标均没有有数据的用户编号 - * 2、部分指标没有数据的用户编号,并表明是哪些指标 - */ - if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) { - Map> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> { - String key = str.substring(3); - key = key.substring(0, key.indexOf(StrPool.AT)); - return key; - })); - //全部缺失数据的用户 - List lackData = new ArrayList<>(); - //部分缺失的用户及指标 - List excalationData = new ArrayList<>(); - Set keyedSet = finalMap.keySet(); - for (String key : keyedSet) { - List data = finalMap.get(key); - if (data.size() == typeList.size()) { - lackData.add(key); - } else { - data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList()); - key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT)); - excalationData.add(key); - } - } - FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/lackData.txt")); - lackDataWriter.writeLines(lackData); - FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/excalationData.txt")); - excalationDataWriter.writeLines(excalationData); - } - log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size()); - //最后批量入库 - batchInsertData(typeData); } @@ -383,11 +263,11 @@ public class BusinessServiceImpl implements IBusinessService { sqlList.add(batchPoints.lineProtocol()); } } - List> subSqlList = ListUtils.partition(sqlList, 50000); + List> subSqlList = ListUtils.partition(sqlList, 20000); int count = 1; for (List sql : subSqlList) { - influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql); - log.error("已经入库{}条记录!", count * 50000); + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "autogen", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql); + log.error("已经入库{}条记录!", count * 20000); count++; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index dbce790..bec9f93 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -42,10 +42,10 @@ spring: max-file-size: 500MB enabled: true influx: - url: http://10.118.135.128:8086 + url: http://10.118.135.129:8086 user: admin password: 123456 - database: ym_testbk + database: pqsbase mapper-location: com.njcn.jbsyncdata.imapper