diff --git a/pom.xml b/pom.xml index 1aa6431..e3fe461 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ 8 8 2.8.0 - 5.7.9 + 5.8.11 2.22 @@ -207,6 +207,13 @@ 1.0.0 + + org.apache.commons + commons-collections4 + 4.4 + + + diff --git a/src/main/java/com/njcn/jbsyncdata/component/TokenComponent.java b/src/main/java/com/njcn/jbsyncdata/component/TokenComponent.java index 31e3849..abdb5ad 100644 --- a/src/main/java/com/njcn/jbsyncdata/component/TokenComponent.java +++ b/src/main/java/com/njcn/jbsyncdata/component/TokenComponent.java @@ -39,7 +39,7 @@ public class TokenComponent { public TokenResult getTokenWithRestTemplate() { RestTemplateUtil restTemplateUtil = new RestTemplateUtil(); ResponseEntity userEntity = restTemplateUtil.post(url.concat("/psr-auth/oauth/accessToken?client_id=" + clientId + "&client_secret=" + clientSecret + "&grant_type=" + grantType), TokenResult.class); - //返回状态不正常返回空 + //状态不正常,返回空 log.info("getTokenWithRestTemplate获取token结束,结果为:{}", userEntity); if (userEntity.getStatusCodeValue() == 200 && userEntity.getBody().getStatus().equalsIgnoreCase("000000")) { return userEntity.getBody(); diff --git a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java index e3f7340..af30987 100644 --- a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java +++ b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java @@ -39,7 +39,7 @@ public class DisPhotovoltaicController { @ApiOperation(value = "获取10kv分布式光伏接入情况") @PostMapping("/import10") - public void importTakeOrder(MultipartFile file) throws Exception { + public void importTakeOrder(MultipartFile file, String startTime, String endTime) throws Exception { List list = EasyExcel.read(file.getInputStream()) .head(ExcelData.class) .headRowNumber(2) @@ -49,14 +49,14 @@ public class DisPhotovoltaicController { .filter(t -> StrUtil.isNotBlank(t.getGenerationUserID())) .filter(StreamUtil.distinctByKey(ExcelData::getGenerationUserID)) .collect(Collectors.toList()); - businessService.testInterfaceByUserId(list); + businessService.testInterfaceByUserId(list, startTime, endTime); System.out.println(); } @ApiOperation(value = "获取380kv分布式光伏接入情况") @PostMapping("/import380") - public void import380(MultipartFile file) throws Exception { + public void import380(MultipartFile file, String startTime, String endTime) throws Exception { List list = EasyExcel.read(file.getInputStream()) .head(ExcelData.class) .headRowNumber(2) @@ -66,7 +66,7 @@ public class DisPhotovoltaicController { .filter(t -> StrUtil.isNotBlank(t.getGenerationUserID())) .filter(StreamUtil.distinctByKey(ExcelData::getGenerationUserID)) .collect(Collectors.toList()); - businessService.testInterfaceByUserId(list); + businessService.testInterfaceByUserId(list, startTime, endTime); System.out.println(); } @@ -86,6 +86,7 @@ public class DisPhotovoltaicController { disPhotovoltaicService.SavaPmsPowerGenerationUser10KV(list, response); } + @ApiOperation(value = "导入380kv分布式光伏接入情况", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) @PostMapping("/import380KV") public void import380KV(MultipartFile file, HttpServletResponse response) throws Exception { diff --git a/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java b/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java index 37d4d40..ac8dba3 100644 --- a/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java +++ b/src/main/java/com/njcn/jbsyncdata/service/IBusinessService.java @@ -5,7 +5,7 @@ import com.njcn.jbsyncdata.pojo.ExcelData; import java.util.List; public interface IBusinessService { - void testInterface(List list); - void testInterfaceByUserId(List list); + void testInterfaceByUserId(List list,String startTime,String endTime); + } diff --git a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java index 29ba8c6..2f35d84 100644 --- a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java +++ b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java @@ -2,13 +2,12 @@ package com.njcn.jbsyncdata.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; -import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; -import cn.hutool.core.io.file.FileReader; import cn.hutool.core.text.StrPool; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import com.njcn.influx.utils.InfluxDbUtils; import com.njcn.jbsyncdata.component.TokenComponent; import com.njcn.jbsyncdata.enums.MeasTypeEnum; import com.njcn.jbsyncdata.pojo.ExcelData; @@ -16,11 +15,16 @@ import com.njcn.jbsyncdata.pojo.result.*; import com.njcn.jbsyncdata.service.IBusinessService; import com.njcn.jbsyncdata.util.RestTemplateUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.ListUtils; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -28,43 +32,31 @@ import java.util.stream.Stream; @Service public class BusinessServiceImpl implements IBusinessService { + @Resource private TokenComponent tokenComponent; - @Override - public void testInterface(List list) { - RestTemplateUtil restTemplateUtil = new RestTemplateUtil(); - TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); - if (null == tokenWithRestTemplate) { - log.error("token信息没有获取到"); - return; - } - JSONObject jsonObject; - JSONObject jsonObjectSub; - for (ExcelData excelData : list) { - jsonObject = JSONUtil.createObj(); - jsonObjectSub = JSONUtil.createObj(); - jsonObject.set("page", 1); - jsonObject.set("perPage", 50); - List psrIds = Stream.of(excelData.getStageID()).collect(Collectors.toList()); - jsonObjectSub.set("psrIds", psrIds); - jsonObjectSub.set("psrType", "0401004"); - jsonObjectSub.set("astIds", new ArrayList<>()); - jsonObjectSub.set("astType", ""); - jsonObjectSub.set("termIds", new ArrayList<>()); - jsonObjectSub.set("termType", ""); - jsonObjectSub.set("measPointIds", new ArrayList<>()); - jsonObject.set("filter", jsonObjectSub); - //组装好json开始发送请求 - Map headers = new HashMap<>(); - headers.put("x-token", tokenWithRestTemplate.getAccess_token()); - ResponseEntity response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/measPoint/commonQuery"), headers, jsonObject, String.class); - log.error("请求接口,台区号为:{},结果为:{}", excelData.getStageID(), response); - } - } + @Resource + private InfluxDbUtils influxDbUtils; + /** + * 此方法通过发电客户编号查询数据,该方法存在以下问题 + * 问题一:一个发电客户编号同指标返回的数据会有多个,但是目前看到最多2个测量点数据。 + * 解决方案:匹配第一条,丢弃后续数据。 + * 问题二:一个客户编号最多2个测量点,一次查8个指标,即返回16条数据,总计大约16万个用户编号,如何高效查询并同步入库 + * 解决方案:暂且定500个客户编号,将每页数据量调整为 500 * 20 = 1W的size,避免存在匹配客户编号数据时,遥测数据不在当前页。 + * 问题三:时间区间如何控制? + * 解决方案:目前暂定通过定时任务,如每天凌晨2点查询前天的数据入库。 + * 问题四:根据客户编号&指标查询数据会出现几种数据为空的情况 + * 现象: 1. 首先GeneralResult的result属性直接为null------------不做处理,直接过 + * 2. PageResult的records属性为null---------------------不做处理,直接过 + * 3. CommonTelemetry的遥测数据集合telemetryValue为null--不做处理,直接过 + * 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0 + * + * @param excelDataList 客户编号集合 + */ @Override - public void testInterfaceByUserId(List list) { + public void testInterfaceByUserId(List excelDataList, String startTime, String endTime) { RestTemplateUtil restTemplateUtil = new RestTemplateUtil(); TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); if (null == tokenWithRestTemplate) { @@ -73,15 +65,21 @@ public class BusinessServiceImpl implements IBusinessService { } JSONObject jsonObject; JSONObject jsonObjectSub; - for (ExcelData excelData : list) { + //将发电客户编号按500尺寸分片 + List> partitionList = ListUtils.partition(excelDataList, 500); + for (List excelData : partitionList) { + Map>> typeData = new HashMap<>(); + //按批次处理客户编号数据 jsonObject = JSONUtil.createObj(); jsonObjectSub = JSONUtil.createObj(); jsonObject.set("page", 1); - jsonObject.set("perPage", 50); - jsonObject.set("startTime", "2023-10-07 00:00:00"); - jsonObject.set("endTime", "2023-10-07 23:59:59"); - List userId = Stream.of("160".concat(excelData.getGenerationUserID())).collect(Collectors.toList()); - jsonObjectSub.set("consNos", userId); + jsonObject.set("perPage", 10000); + jsonObject.set("startTime", startTime); + jsonObject.set("endTime", endTime); + //处理客户编号 + List generationUserIDList = excelData.stream().map(t -> "160".concat(t.getGenerationUserID())).collect(Collectors.toList()); + jsonObjectSub.set("consNos", generationUserIDList); + //1公专变2低压用户3光伏 jsonObjectSub.set("consType", 3); jsonObjectSub.set("astIds", new ArrayList<>()); jsonObjectSub.set("astType", ""); @@ -97,79 +95,91 @@ public class BusinessServiceImpl implements IBusinessService { headers.put("x-token", tokenWithRestTemplate.getAccess_token()); ResponseEntity response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, GeneralResult.class); if (response.getStatusCodeValue() == 200 && response.getBody().getStatus().equalsIgnoreCase("000000")) { - PageResult result = response.getBody().getResult(); + GeneralResult generalResult = response.getBody(); + PageResult result = generalResult.getResult(); List records = result.getRecords(); if (Objects.isNull(result) || CollectionUtil.isEmpty(result.getRecords())) { //日志输出: - log.error("用户编号为:{},无遥测数据;", excelData.getGenerationUserID()); + log.error("起始时间:{},截止时间:{},无遥测数据;", startTime, endTime); continue; } + //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType + List userIdConcatMeasType = new ArrayList<>(); + for (String measType : typeList) { + List temp = generationUserIDList.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toList()); + userIdConcatMeasType.addAll(temp); + } //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 - Map>> typeData = new HashMap<>(); - for (String type : typeList) { - MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(type); - for (CommonTelemetry commonTelemetry : records) { - if (type.equalsIgnoreCase(commonTelemetry.getMeasTypeCode())) { - List statisticsDataList = commonTelemetry.getTelemetryValue(); - List> influxData = new ArrayList<>(); - for (StatisticsData statisticsData : statisticsDataList) { - Map tempInfluxData = new HashMap<>(); - tempInfluxData.put("phasic_type", measTypeEnumByMeasType.getPhaseType()); - tempInfluxData.put("line_id", "160".concat(excelData.getGenerationUserID())); - tempInfluxData.put("quality_flag", "0"); - tempInfluxData.put("value_type", "AVG"); - tempInfluxData.put("time", statisticsData.getDataTime()); - //为空则赋值为0,表中其他均为0 - tempInfluxData.put(measTypeEnumByMeasType.getFieldName(), StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue()); - influxData.add(tempInfluxData); - } - //measType@tableName:存在多个指标存储表名一致,避免数据覆盖; - typeData.put(measTypeEnumByMeasType.getMeasType().concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); - break; + for (CommonTelemetry commonTelemetry : records) { + String dataIdentify = commonTelemetry.getConsNo().concat(StrPool.AT).concat(commonTelemetry.getMeasTypeCode()); + if (userIdConcatMeasType.contains(dataIdentify)) { + //首个包含该标识的数据进行处理 + MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.getMeasTypeCode()); + List statisticsDataList = commonTelemetry.getTelemetryValue(); + List> influxData = new ArrayList<>(); + for (StatisticsData statisticsData : statisticsDataList) { // 匹配上进入,循环96次 + Map tempInfluxData = new HashMap<>(); + tempInfluxData.put("phasic_type", measTypeEnumByMeasType.getPhaseType()); + tempInfluxData.put("line_id", commonTelemetry.getConsNo()); + tempInfluxData.put("quality_flag", "0"); + tempInfluxData.put("value_type", "AVG"); + tempInfluxData.put("time", statisticsData.getDataTime()); + //为空则赋值为0,表中其他均为0 + tempInfluxData.put(measTypeEnumByMeasType.getFieldName(), StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue()); + influxData.add(tempInfluxData); } + //measType@tableName:存在多个指标存储表名一致,避免数据覆盖; + typeData.put(measTypeEnumByMeasType.getMeasType().concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); + //处理完,删除该条记录,减少集合尺寸,提高效率 + userIdConcatMeasType.remove(dataIdentify); + break; } } } - - - log.error("请求接口,台区号为:{},结果为:{}", excelData.getStageID(), response); + //每片数据获取完毕后,将数据处理入influxdb库 + batchInsertData(typeData); } + + } - - public static void main(String[] args) { - String path = "C:\\Users\\83944\\Desktop\\test\\test.txt"; - FileReader fileReader = new FileReader(path); - String jsonStr = fileReader.readString(); - GeneralResult result = JSONUtil.toBean(jsonStr, GeneralResult.class); - List records = result.getResult().getRecords(); - //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 - Map>> typeData = new HashMap<>(); - List typeList = Stream.of("PhV_phsA", "PhV_phsB", "PhV_phsC").collect(Collectors.toList()); - for (String type : typeList) { - MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(type); - for (CommonTelemetry commonTelemetry : records) { - if (type.equalsIgnoreCase(commonTelemetry.getMeasTypeCode())) { - List statisticsDataList = commonTelemetry.getTelemetryValue(); - List> influxData = new ArrayList<>(); - for (StatisticsData statisticsData : statisticsDataList) { - Map tempInfluxData = new HashMap<>(); - tempInfluxData.put("phasic_type", measTypeEnumByMeasType.getPhaseType()); - tempInfluxData.put("line_id", "1602514341899"); - tempInfluxData.put("quality_flag", "0"); - tempInfluxData.put("value_type", "AVG"); - tempInfluxData.put("time", statisticsData.getDataTime()); - //为空则赋值为0,表中其他均为0 - tempInfluxData.put(measTypeEnumByMeasType.getFieldName(), StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue()); - influxData.add(tempInfluxData); - } - typeData.put(measTypeEnumByMeasType.getMeasType().concat("@").concat(measTypeEnumByMeasType.getTableName()), influxData); - break; + /** + * 批量入库influxDB + * @param typeData 远程根据用户编号获取的数据 Map>> typeData = new HashMap<>(); + */ + private void batchInsertData(Map>> typeData) { + List sqlList = new ArrayList<>(); + Set tableNames = typeData.keySet(); + for (String tableName : tableNames) { + List> data = typeData.get(tableName); + tableName = tableName.substring(tableName.indexOf(StrPool.AT) + 1); + //需要转换的实体类class类 + for (Map datum : data) { + //tag数据 + Map tags = new HashMap<>(); + tags.put("phasic_type", datum.get("phasic_type")); + datum.remove("phasic_type"); + tags.put("line_id", datum.get("line_id")); + datum.remove("line_id"); + tags.put("quality_flag", datum.get("quality_flag")); + datum.remove("quality_flag"); + tags.put("value_type", datum.get("value_type")); + datum.remove("value_type"); + String time = datum.get("time"); + datum.remove("time"); + //tag数据删完后,剩余均是filed数据,因filed属性名不固定,无法指定获取,直接循环 + Map fields = new HashMap<>(); + Set fieldNames = datum.keySet(); + for (String fieldName : fieldNames) { + fields.put(fieldName, Double.parseDouble(datum.get(fieldName))); } + Point point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime() + 8 * 3600 * 1000, TimeUnit.MILLISECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + sqlList.add(batchPoints.lineProtocol()); + } } - - System.out.println(11); - + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sqlList); } }