diff --git a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java index 8ce9ed2..c82619d 100644 --- a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java +++ b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java @@ -2,6 +2,7 @@ package com.njcn.jbsyncdata.controller; import cn.hutool.core.util.StrUtil; import com.alibaba.excel.EasyExcel; +import com.njcn.jbsyncdata.enums.MeasTypeEnum; import com.njcn.jbsyncdata.pojo.DisPhotovoltaic10Excel; import com.njcn.jbsyncdata.pojo.DisPhotovoltaic380Excel; import com.njcn.jbsyncdata.pojo.ExcelData; @@ -19,7 +20,10 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletResponse; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -40,7 +44,11 @@ public class DisPhotovoltaicController { @ApiOperation(value = "查询所有用户的遥测数据") @PostMapping("/queryTelemetryData") public void queryTelemetryData(String startTime, String endTime) { - businessService.queryTelemetryData(startTime, endTime); + try { + businessService.queryTelemetryData(startTime, endTime); + } catch (Exception exception) { + exception.printStackTrace(); + } } @ApiOperation(value = "导入10kv分布式光伏接入情况", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) @@ -80,9 +88,10 @@ public class DisPhotovoltaicController { @PostMapping("/insertDistributionMonitor") public String import380KV() throws Exception { Boolean aBoolean = disPhotovoltaicService.savePmsDistributionMonitor(); - if(aBoolean){ + if (aBoolean) { return "数据导入成功"; } return "数据导入失败"; } + } diff --git a/src/main/java/com/njcn/jbsyncdata/controller/DownloadController.java b/src/main/java/com/njcn/jbsyncdata/controller/DownloadController.java new file mode 100644 index 0000000..0f91d7f --- /dev/null +++ b/src/main/java/com/njcn/jbsyncdata/controller/DownloadController.java @@ -0,0 +1,50 @@ +package com.njcn.jbsyncdata.controller; + + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.util.StrUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.FileCopyUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import javax.servlet.http.HttpServletResponse; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.net.URLEncoder; + +@RestController +@Slf4j +@Api(tags = "下载资源入口") +@RequestMapping("/download") +@RequiredArgsConstructor +public class DownloadController { + + @ApiOperation(value = "根据资源路径下载文件") + @GetMapping("/byPath") + public void download(@RequestParam(value = "filePath", required = false) String filePath, HttpServletResponse response) throws Exception { + //路径为空不允许保存 + if (StrUtil.isEmpty(filePath)) { + return; + } + //获取输入流对象(用于读文件) + BufferedInputStream bis = new BufferedInputStream(FileUtil.getInputStream(filePath)); + //获取文件后缀 + String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); + response.setContentType("application/octet-stream"); + //设置响应头,attachment表示以附件的形式下载,inline表示在线打开 + response.setHeader("content-disposition", "attachment;fileName=" + URLEncoder.encode(fileName, "UTF-8")); + //获取输出流对象(用于写文件) + BufferedOutputStream bos = new BufferedOutputStream(response.getOutputStream()); + //下载文件,使用spring框架中的FileCopyUtils工具(内部自动关闭流,释放资源) + FileCopyUtils.copy(bis, bos); + System.out.println("=========================文件下载成功=========================="); + } + + +} diff --git a/src/main/java/com/njcn/jbsyncdata/pojo/result/TokenResult.java b/src/main/java/com/njcn/jbsyncdata/pojo/result/TokenResult.java index afdfbce..5396a20 100644 --- a/src/main/java/com/njcn/jbsyncdata/pojo/result/TokenResult.java +++ b/src/main/java/com/njcn/jbsyncdata/pojo/result/TokenResult.java @@ -1,8 +1,14 @@ package com.njcn.jbsyncdata.pojo.result; +import cn.hutool.core.io.file.FileWriter; +import cn.hutool.core.text.StrPool; import lombok.Data; +import org.apache.commons.lang3.StringUtils; +import java.io.File; import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; /** @@ -22,5 +28,4 @@ public class TokenResult implements Serializable { private String refresh_token; - } diff --git a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java index 2511589..e98e526 100644 --- a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java +++ b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java @@ -4,8 +4,10 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; import cn.hutool.core.io.file.FileWriter; +import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.StrPool; import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.njcn.influx.utils.InfluxDbUtils; @@ -18,6 +20,9 @@ import com.njcn.jbsyncdata.service.IPmsPowerGenerationUserService; import com.njcn.jbsyncdata.util.RestTemplateUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.ListUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.collections4.SetUtils; +import org.apache.commons.lang3.StringUtils; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; @@ -49,7 +54,7 @@ public class BusinessServiceImpl implements IBusinessService { * 问题一:一个发电客户编号同指标返回的数据会有多个,但是目前看到最多2个测量点数据。 * 解决方案:匹配第一条,丢弃后续数据。 * 问题二:一个客户编号最多2个测量点,一次查8个指标,即返回16条数据,总计大约16万个用户编号,如何高效查询并同步入库 - * 解决方案:暂且定500个客户编号,将每页数据量调整为 500 * 20 = 1W的size,避免存在匹配客户编号数据时,遥测数据不在当前页。 + * 解决方案:暂且定100个客户编号,将每页数据量调整为 100 * 20 = 2000的size,避免存在匹配客户编号数据时,遥测数据不在当前页。 * 问题三:时间区间如何控制? * 解决方案:目前暂定通过定时任务,如每天凌晨2点查询前天的数据入库。 * 问题四:根据客户编号&指标查询数据会出现几种数据为空的情况 @@ -58,141 +63,333 @@ public class BusinessServiceImpl implements IBusinessService { * 3. CommonTelemetry的遥测数据集合telemetryValue为null--不做处理,直接过 * 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0 */ - @Override - public void queryTelemetryData(String startTime, String endTime) { - List noDataUser = new ArrayList<>(); + public void queryTelemetryData2(String startTime, String endTime) { +// 连指标都没有返回的用户 +// List noDataUser = new ArrayList<>(); + //有指标返回,但是指标的遥测整体数据为null + List noDataUserWithMeasType = new ArrayList<>(); RestTemplateUtil restTemplateUtil = new RestTemplateUtil(); TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); if (null == tokenWithRestTemplate) { log.error("token信息获取失败"); return; } - JSONObject jsonObject; - JSONObject jsonObjectSub; + List typeList = MeasTypeEnum.getMeasList(); + JSONObject jsonObject = JSONUtil.createObj(); + JSONObject jsonObjectSub = JSONUtil.createObj(); + jsonObject.set("page", 1); + jsonObject.set("perPage", 10000); + jsonObject.set("startTime", "2023-10-07 00:00:00"); + jsonObject.set("endTime", "2023-10-07 11:59:59"); + //1公专变2低压用户3光伏 + jsonObjectSub.set("consType", "3"); + jsonObjectSub.set("astIds", new ArrayList<>()); + jsonObjectSub.set("astType", ""); + jsonObjectSub.set("psrIds", new ArrayList<>()); + jsonObjectSub.set("psrType", ""); + jsonObjectSub.set("measPointIds", new ArrayList<>()); + jsonObjectSub.set("telemetryTypes", typeList); + //组装好json开始发送请求 + Map headers = new HashMap<>(); + headers.put("x-token", tokenWithRestTemplate.getAccess_token()); //获取所有发电用户的id List userIds = pmsPowerGenerationUserService.queryAllUserId(); - //将发电用户编号按500尺寸分片 - List> partitionList = ListUtils.partition(userIds, 500); - for (List userId : partitionList) { - Map>> typeData = new HashMap<>(); - jsonObject = JSONUtil.createObj(); - jsonObjectSub = JSONUtil.createObj(); - jsonObject.set("page", 1); - jsonObject.set("perPage", 10000); - jsonObject.set("startTime", startTime); - jsonObject.set("endTime", endTime); + //将发电用户编号按100尺寸分片 + List> partitionList = ListUtils.partition(userIds, 100); + log.error("总计分了{}片", partitionList.size()); + List userIdConcatMeasType; + int count = 0; + //指标类型集合 + List influxData; + Map> typeData = new HashMap<>(); + StringBuilder tempInfluxData; + ResponseEntity response; + List statisticsDataList; + PageResult result; + List records; + String dataIdentify; + MeasTypeEnum measTypeEnumByMeasType; + for (List generationUserIDList : partitionList) { + count++; + log.error("查询第{}片数据", count); //按批次处理用户编号数据 - List generationUserIDList = userId.stream().map("160"::concat).collect(Collectors.toList()); jsonObjectSub.set("consNos", generationUserIDList); - //1公专变2低压用户3光伏 - jsonObjectSub.set("consType", 3); - jsonObjectSub.set("astIds", new ArrayList<>()); - jsonObjectSub.set("astType", ""); - jsonObjectSub.set("psrIds", new ArrayList<>()); - jsonObjectSub.set("psrType", ""); - jsonObjectSub.set("measPointIds", new ArrayList<>()); - //指标类型集合 - List typeList = MeasTypeEnum.getMeasList(); - jsonObjectSub.set("telemetryTypes", typeList); //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType - List userIdConcatMeasType = new ArrayList<>(); + userIdConcatMeasType = new ArrayList<>(); for (String measType : typeList) { - List temp = generationUserIDList.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toList()); - userIdConcatMeasType.addAll(temp); + userIdConcatMeasType.addAll(generationUserIDList.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toList())); } - jsonObject.set("filter", jsonObjectSub); - //组装好json开始发送请求 - Map headers = new HashMap<>(); - headers.put("x-token", tokenWithRestTemplate.getAccess_token()); - ResponseEntity response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, GeneralResult.class); - if (response.getStatusCodeValue() == 200 && response.getBody().getStatus().equalsIgnoreCase("000000")) { - GeneralResult generalResult = response.getBody(); - PageResult result = generalResult.getResult(); - List records = result.getRecords(); - if (Objects.isNull(result) || CollectionUtil.isEmpty(result.getRecords())) { + JSONArray jsonArray = JSONUtil.createArray(); + jsonArray.add(jsonObjectSub); + jsonObject.set("filters", jsonArray); + response = restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, GeneralResult.class); + if (response.getStatusCodeValue() == 200 && Objects.nonNull(response.getBody()) && response.getBody().getStatus().equalsIgnoreCase("000000")) { + result = response.getBody().getResult(); + records = result.getRecords(); + log.error("查询遥测数据结束,返回数据量:{}", records.size()); + if (Objects.isNull(result) || CollectionUtil.isEmpty(records)) { //日志输出: log.error("起始时间:{},截止时间:{},无遥测数据;", startTime, endTime); continue; } - //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 - for (CommonTelemetry commonTelemetry : records) { // 最多循环500*16次 - String dataIdentify = commonTelemetry.getConsNo().concat(StrPool.AT).concat(commonTelemetry.getMeasTypeCode()); + for (CommonTelemetry commonTelemetry : records) { // 最多循环100*16次 + dataIdentify = commonTelemetry.getConsNo().concat(StrPool.AT).concat(commonTelemetry.getMeasTypeCode()); if (userIdConcatMeasType.contains(dataIdentify)) { //首个包含该标识的数据进行处理 - MeasTypeEnum measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.getMeasTypeCode()); - List statisticsDataList = commonTelemetry.getTelemetryValue(); - List> influxData = new ArrayList<>(); - for (StatisticsData statisticsData : statisticsDataList) { // 匹配上进入,循环96次 - Map tempInfluxData = new HashMap<>(); - tempInfluxData.put("phasic_type", measTypeEnumByMeasType.getPhaseType()); - tempInfluxData.put("line_id", commonTelemetry.getConsNo()); - tempInfluxData.put("quality_flag", "0"); - tempInfluxData.put("value_type", "AVG"); - tempInfluxData.put("time", statisticsData.getDataTime()); - //为空则赋值为0,表中其他均为0 - tempInfluxData.put(measTypeEnumByMeasType.getFieldName(), StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue()); - influxData.add(tempInfluxData); + measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.getMeasTypeCode()); + //统计数据,经过测试,接口响应json可能不包含该属性 + statisticsDataList = commonTelemetry.getTelemetryValue(); + if (CollectionUtil.isEmpty(statisticsDataList)) { + //添加进有指标但无遥测数据集合 + noDataUserWithMeasType.add(dataIdentify); + continue; } - //measType@tableName:存在多个指标存储表名一致,避免数据覆盖; - typeData.put(measTypeEnumByMeasType.getMeasType().concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); + influxData = new ArrayList<>(); + for (StatisticsData statisticsData : statisticsDataList) { // 匹配上进入,循环96次 + tempInfluxData = new StringBuilder(); + tempInfluxData.append(commonTelemetry.getMeasTypeCode()) + .append(StrPool.COMMA) + .append(commonTelemetry.getConsNo()) + .append(StrPool.COMMA) + .append(statisticsData.getDataTime()) + .append(StrPool.COMMA) + .append(measTypeEnumByMeasType.getFieldName()) + .append(StrPool.COMMA) + .append(StrUtil.isBlank(statisticsData.getMeasValue()) ? "0" : statisticsData.getMeasValue()); + influxData.add(tempInfluxData.toString()); + } + //userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖; + typeData.put(commonTelemetry.getConsNo().concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); //处理完,删除该条记录,减少集合尺寸,提高效率 userIdConcatMeasType.remove(dataIdentify); - break; } } - //没有匹配上的就是该用户没有数据 - noDataUser.addAll(userIdConcatMeasType); - + log.error("剩余有{}条标识", userIdConcatMeasType.size()); +// noDataUser.addAll(userIdConcatMeasType); + } else { + log.error("查询遥测数据失败!第{}片,结果为:{}", count, response); + } +// System.gc(); + } + //最后批量入库 + batchInsertData(typeData); + //最后输出没有数据的用户编号 +// if (CollectionUtil.isNotEmpty(noDataUser)) { +// noDataUser = noDataUser.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)) +// .distinct() +// .collect(Collectors.toList()); +// FileWriter writer = FileWriter.create(new File("/usr/local/demo.txt")); +// writer.writeLines(noDataUser); +// } +// log.error("用户没有数据的长度为:{}", noDataUser.size()); + log.error("用户有指标没有数据的长度为:{}", noDataUserWithMeasType.size()); + } + + @Override + public void queryTelemetryData(String startTime, String endTime) { + //有指标返回,但是指标的遥测整体数据为null + RestTemplateUtil restTemplateUtil = new RestTemplateUtil(); + TokenResult tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); + if (null == tokenWithRestTemplate) { + log.error("token信息获取失败"); + return; + } + List typeList = MeasTypeEnum.getMeasList(); + JSONObject jsonObject = JSONUtil.createObj(); + JSONObject jsonObjectSub = JSONUtil.createObj(); + jsonObject.set("page", 1); + jsonObject.set("perPage", 10000); + jsonObject.set("startTime", "2023-10-07 00:00:00"); + jsonObject.set("endTime", "2023-10-07 11:59:59"); + //1公专变2低压用户3光伏 + jsonObjectSub.set("consType", "3"); + jsonObjectSub.set("astIds", new ArrayList<>()); + jsonObjectSub.set("astType", ""); + jsonObjectSub.set("psrIds", new ArrayList<>()); + jsonObjectSub.set("psrType", ""); + jsonObjectSub.set("measPointIds", new ArrayList<>()); + jsonObjectSub.set("telemetryTypes", typeList); + //组装好json开始发送请求 + Map headers = new HashMap<>(); + headers.put("x-token", tokenWithRestTemplate.getAccess_token()); + //获取所有发电用户的id + List userIds = pmsPowerGenerationUserService.queryAllUserId(); + //将发电用户编号按100尺寸分片 + List> partitionList = ListUtils.partition(userIds, 100); + log.error("总计分了{}片", partitionList.size()); + + int count = 0; + //先获取数据 + List> responseEntities = new ArrayList<>(2000); + for (List generationUserIDList : partitionList) { + count++; + log.error("查询第{}片数据", count); + //按批次处理用户编号数据 + jsonObjectSub.set("consNos", generationUserIDList); + JSONArray jsonArray = JSONUtil.createArray(); + jsonArray.add(jsonObjectSub); + jsonObject.set("filters", jsonArray); + //避免中途token失效 + if (count % 800 == 0) { + tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); + headers.put("x-token", tokenWithRestTemplate.getAccess_token()); + } + responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class)); + } + //开始解析数据 + Set userIdConcatMeasType = new HashSet<>(); + //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType + for (String measType : typeList) { + userIdConcatMeasType.addAll(userIds.stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet())); + } + List influxData; + Map> typeData = new HashMap<>(); + StringBuilder tempInfluxData; + ResponseEntity response; + JSONArray statisticsDataList; + JSONObject result; + JSONObject statisticsData; + JSONObject body; + JSONArray records; + String dataIdentify; + JSONObject commonTelemetry; + MeasTypeEnum measTypeEnumByMeasType; + for (int i = 0; i < partitionList.size(); i++) { + log.error("解析第{}片数据", i); + response = responseEntities.get(i); + body = JSONUtil.parseObj(response.getBody()); + if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) { + result = JSONUtil.parseObj(body.get("result", String.class)); + records = JSONUtil.parseArray(result.get("records", String.class)); + log.error("查询遥测数据结束,返回数据量:{}", records.size()); + if (CollectionUtil.isEmpty(records)) { + //日志输出: + log.error("起始时间:{},截止时间:{},无遥测数据;", startTime, endTime); + continue; + } + //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 + for (Object obj : records) { // 最多循环100*16次 + commonTelemetry = JSONUtil.parseObj(obj); + dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class)); + if (userIdConcatMeasType.contains(dataIdentify)) { + //首个包含该标识的数据进行处理 + measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class)); + //统计数据,经过测试,接口响应json可能不包含该属性 + statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class); + if (CollectionUtil.isEmpty(statisticsDataList)) { + //添加进有指标但无遥测数据集合 + continue; + } + influxData = new ArrayList<>(); + for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次 + statisticsData = JSONUtil.parseObj(subObj); + tempInfluxData = new StringBuilder(); + tempInfluxData.append(measTypeEnumByMeasType.getPhaseType()) + .append(StrPool.COMMA) + .append(commonTelemetry.get("consNo", String.class)) + .append(StrPool.COMMA) + .append(statisticsData.get("dataTime", String.class)) + .append(StrPool.COMMA) + .append(measTypeEnumByMeasType.getFieldName()) + .append(StrPool.COMMA) + .append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class)); + influxData.add(tempInfluxData.toString()); + } + //userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖; + typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); + //处理完,删除该条记录,减少集合尺寸,提高效率 + userIdConcatMeasType.remove(dataIdentify); + } + } + //没有匹配上的就是该用户没有数据 + log.error("剩余有{}条标识", userIdConcatMeasType.size()); + } else { + log.error("查询遥测数据失败!第{}片,结果为:{}", count, response); } - //每片数据获取完毕后,将数据处理入influxdb库 - batchInsertData(typeData); } //最后输出没有数据的用户编号 - noDataUser = noDataUser.stream().map(t -> t.substring(t.indexOf(StrPool.AT+1))) - .distinct() - .collect(Collectors.toList()); - FileWriter writer = FileWriter.create(new File("/usr/local/" + startTime + "-" + endTime + ".txt")); - File file = writer.writeLines(noDataUser); + /** + * 输出到2个文件,lackData.txt、 excalationData.txt + * 注:用户号去除160前缀 + * 1、所有指标均没有有数据的用户编号 + * 2、部分指标没有数据的用户编号,并表明是哪些指标 + */ + if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) { + Map> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> { + String key = str.substring(3); + key = key.substring(0, key.indexOf(StrPool.AT)); + return key; + })); + //全部缺失数据的用户 + List lackData = new ArrayList<>(); + //部分缺失的用户及指标 + List excalationData = new ArrayList<>(); + Set keyedSet = finalMap.keySet(); + for (String key : keyedSet) { + List data = finalMap.get(key); + if (data.size() == typeList.size()) { + lackData.add(key); + } else { + data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList()); + key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT)); + excalationData.add(key); + } + } + FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/lackData.txt")); + lackDataWriter.writeLines(lackData); + FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/excalationData.txt")); + excalationDataWriter.writeLines(excalationData); + } + log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size()); + //最后批量入库 + batchInsertData(typeData); } + /** * 批量入库influxDB + * * @param typeData 远程根据用户编号获取的数据 Map>> typeData = new HashMap<>(); */ - private void batchInsertData(Map>> typeData) { + private void batchInsertData(Map> typeData) { + log.error("总计有{}条记录入库,以5000作为基数分片插入influxdb", typeData.size()); List sqlList = new ArrayList<>(); Set tableNames = typeData.keySet(); + String[] datas; + Map tags; + Map fields; + Point point; + BatchPoints batchPoints; for (String tableName : tableNames) { - List> data = typeData.get(tableName); - tableName = tableName.substring(tableName.indexOf(StrPool.AT) + 1); - for (Map datum : data) { + List data = typeData.get(tableName); + tableName = tableName.substring(tableName.lastIndexOf(StrPool.AT) + 1); + for (String datum : data) { + datas = datum.split(StrPool.COMMA); //tag数据 - Map tags = new HashMap<>(); - tags.put("phasic_type", datum.get("phasic_type")); - datum.remove("phasic_type"); - tags.put("line_id", datum.get("line_id")); - datum.remove("line_id"); - tags.put("quality_flag", datum.get("quality_flag")); - datum.remove("quality_flag"); - tags.put("value_type", datum.get("value_type")); - datum.remove("value_type"); - String time = datum.get("time"); - datum.remove("time"); + tags = new HashMap<>(); + tags.put("phasic_type", datas[0]); + tags.put("line_id", datas[1]); + tags.put("quality_flag", "0"); + tags.put("value_type", "AVG"); + String time = datas[2]; //tag数据删完后,剩余均是filed数据,因filed属性名不固定,无法指定获取,直接循环 - Map fields = new HashMap<>(); - Set fieldNames = datum.keySet(); - for (String fieldName : fieldNames) { - fields.put(fieldName, Double.parseDouble(datum.get(fieldName))); - } - Point point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime() + 8 * 3600 * 1000, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + fields = new HashMap<>(); + fields.put(datas[3], datas[4]); + point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime(), TimeUnit.MILLISECONDS, tags, fields); + batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); sqlList.add(batchPoints.lineProtocol()); - } } - influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sqlList); + List> subSqlList = ListUtils.partition(sqlList, 50000); + int count = 1; + for (List sql : subSqlList) { + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql); + log.error("已经入库{}条记录!", count * 50000); + count++; + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 82d8942..dbce790 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,17 +4,17 @@ server: max-swallow-size: 100MB #重要的一行,修改tomcat的吞吐量 spring: application: - name: hbsyncdata + name: jbsyncdata #数据库内容配置 datasource: druid: driver-class-name: com.mysql.cj.jdbc.Driver -# url: jdbc:mysql://101.132.25.239:13306/pqsinfo?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT&rewriteBatchedStatements=true -# username: root -# password: njcnpqs - url: jdbc:mysql://192.168.1.18:13306/pqsinfo_pms_jb?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=CTT + url: jdbc:mysql://10.118.135.128:13306/pqsinfo_pms_jb?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT&rewriteBatchedStatements=true username: root password: njcnpqs +# url: jdbc:mysql://101.132.73.63:13306/pqsinfo_pms_jb?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=CTT +# username: root +# password: '*#Bg20230711' #初始化建立物理连接的个数、最小、最大连接数 initial-size: 5 min-idle: 5 @@ -42,10 +42,10 @@ spring: max-file-size: 500MB enabled: true influx: - url: http://192.168.1.18:8086 + url: http://10.118.135.128:8086 user: admin password: 123456 - database: pqsbase_sjzx + database: ym_testbk mapper-location: com.njcn.jbsyncdata.imapper @@ -57,9 +57,9 @@ mybatis-plus: #驼峰命名 map-underscore-to-camel-case: true #配置sql日志输出 - log-impl: org.apache.ibatis.logging.stdout.StdOutImpl +# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #关闭日志输出 - #log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl + log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl global-config: db-config: #指定主键生成策略 @@ -69,6 +69,6 @@ log: jibei: client_id: bad079495dc111ee987b0a580a080620 - client_secret: OxXlgFs9HH105L3cOg8ogYoFRFs8sKITJhVocyOprxoWSadcX0we2wffjyTUYGsK + client_secret: OxXIgFs9HHI05L3cOg8ogYoFRFs8sKlTJhVocyOprxoWSadcX0we2wffjyTUYGsK grant_type: credentials - url: http://25.42.182.119.32001 \ No newline at end of file + url: http://25.42.182.119:32001 \ No newline at end of file