From fc5a1cc78be91fca3b2aa639adc54deb2b9b1587 Mon Sep 17 00:00:00 2001 From: wr <1754607820@qq.com> Date: Mon, 30 Oct 2023 08:45:24 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=95=B0=E6=8D=AE=E6=8F=92=E5=85=A5=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E5=A4=9A=E7=BA=BF=E7=A8=8B=202.=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jbsyncdata/JbSyncdataApplication.java | 4 + .../jbsyncdata/config/AsyncConfiguration.java | 47 ++++ .../njcn/jbsyncdata/config/GeneralInfo.java | 33 +++ .../controller/DisPhotovoltaicController.java | 18 +- .../njcn/jbsyncdata/enums/MeasTypeEnum.java | 3 + .../service/impl/BusinessServiceImpl.java | 204 +-------------- .../impl/DisPhotovoltaicServiceImpl.java | 24 +- .../njcn/jbsyncdata/util/DataProcessing.java | 239 ++++++++++++++++++ src/main/resources/application.yml | 14 +- 9 files changed, 367 insertions(+), 219 deletions(-) create mode 100644 src/main/java/com/njcn/jbsyncdata/config/AsyncConfiguration.java create mode 100644 src/main/java/com/njcn/jbsyncdata/config/GeneralInfo.java create mode 100644 src/main/java/com/njcn/jbsyncdata/util/DataProcessing.java diff --git a/src/main/java/com/njcn/jbsyncdata/JbSyncdataApplication.java b/src/main/java/com/njcn/jbsyncdata/JbSyncdataApplication.java index b96346c..cedeac3 100644 --- a/src/main/java/com/njcn/jbsyncdata/JbSyncdataApplication.java +++ b/src/main/java/com/njcn/jbsyncdata/JbSyncdataApplication.java @@ -3,9 +3,13 @@ package com.njcn.jbsyncdata; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; @MapperScan("com.njcn.**.mapper") @SpringBootApplication(scanBasePackages = "com.njcn") +@EnableScheduling +@EnableAsync public class JbSyncdataApplication { public static void main(String[] args) { diff --git a/src/main/java/com/njcn/jbsyncdata/config/AsyncConfiguration.java b/src/main/java/com/njcn/jbsyncdata/config/AsyncConfiguration.java new file mode 100644 index 0000000..19158eb --- /dev/null +++ b/src/main/java/com/njcn/jbsyncdata/config/AsyncConfiguration.java @@ -0,0 +1,47 @@ +package com.njcn.jbsyncdata.config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2022年03月11日 09:32 + */ +@Data +@Order(100) +@Configuration +@EnableAsync +@AllArgsConstructor +public class AsyncConfiguration { + + private final GeneralInfo generalInfo; + + @Bean("asyncExecutor") + public Executor asyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数:线程池创建时候初始化的线程数 + executor.setCorePoolSize(generalInfo.getCorePoolSize()); + // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程 + executor.setMaxPoolSize(generalInfo.getMaxPoolSize()); + // 缓冲队列:用来缓冲执行任务的队列 + executor.setQueueCapacity(generalInfo.getQueueCapacity()); + // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁 + executor.setKeepAliveSeconds(generalInfo.getKeepAliveSeconds()); + // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池 + executor.setThreadNamePrefix(generalInfo.getMicroServiceName()); + // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程) + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + +} diff --git a/src/main/java/com/njcn/jbsyncdata/config/GeneralInfo.java b/src/main/java/com/njcn/jbsyncdata/config/GeneralInfo.java new file mode 100644 index 0000000..d1f8466 --- /dev/null +++ b/src/main/java/com/njcn/jbsyncdata/config/GeneralInfo.java @@ -0,0 +1,33 @@ +package com.njcn.jbsyncdata.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2021年08月19日 15:56 + */ +@Data +@Configuration +@Order(10) +public class GeneralInfo { + + @Value("${microservice.ename}") + private String microServiceName; + + @Value("${threadPool.corePoolSize}") + private int corePoolSize; + + @Value("${threadPool.maxPoolSize}") + private int maxPoolSize; + + @Value("${threadPool.queueCapacity}") + private int queueCapacity; + + @Value("${threadPool.keepAliveSeconds}") + private int keepAliveSeconds; + +} diff --git a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java index 5886cc1..a778694 100644 --- a/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java +++ b/src/main/java/com/njcn/jbsyncdata/controller/DisPhotovoltaicController.java @@ -1,14 +1,10 @@ package com.njcn.jbsyncdata.controller; -import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; -import cn.hutool.core.io.file.FileReader; -import cn.hutool.core.text.StrPool; +import cn.hutool.core.date.DatePattern; import cn.hutool.core.util.StrUtil; import com.alibaba.excel.EasyExcel; -import com.alibaba.excel.support.ExcelTypeEnum; import com.njcn.jbsyncdata.pojo.DisPhotovoltaic10Excel; import com.njcn.jbsyncdata.pojo.DisPhotovoltaic380Excel; import com.njcn.jbsyncdata.service.DisPhotovoltaicService; @@ -19,13 +15,13 @@ import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.MediaType; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletResponse; -import java.io.IOException; import java.util.*; import java.util.stream.Collectors; @@ -124,5 +120,15 @@ public class DisPhotovoltaicController { } return "数据导入失败"; } + @Scheduled(cron = "0 30 8 * * ?") + public void insert() { + log.error(Thread.currentThread().getName(),"1.定时器启动----!"); + DateTime dateTime = DateUtil.offsetDay(new Date(), -1); + String s=dateTime.toString(); + String ds = s.substring(0, s.indexOf(" ")); + log.error(Thread.currentThread().getName() + "2.定时器执行数据日期 "+ds+"----!"); + businessService.queryTelemetryData(ds); + log.error(Thread.currentThread().getName() + "2.定时器执行数据成功 "+ds+"----!"); + } } diff --git a/src/main/java/com/njcn/jbsyncdata/enums/MeasTypeEnum.java b/src/main/java/com/njcn/jbsyncdata/enums/MeasTypeEnum.java index e02b7c7..018f543 100644 --- a/src/main/java/com/njcn/jbsyncdata/enums/MeasTypeEnum.java +++ b/src/main/java/com/njcn/jbsyncdata/enums/MeasTypeEnum.java @@ -23,6 +23,9 @@ public enum MeasTypeEnum { TOTW("TotW", "有功", "T","data_harmpower_p","p"), TOTVAR("TotVar", "无功", "T","data_harmpower_q","q"); +// A_V3("HphV3_phsA", "A相电压3次谐波值", "A","data_v","v3"), +// B_V3("HphV3_phsB", "A相电压3次谐波值", "B","data_v","v3"), +// C_V3("Hphv3_phsC", "A相电压3次谐波值", "C","data_v","v3"); //冀北指标名称 private final String measType; diff --git a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java index 35fd5ea..693930f 100644 --- a/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java +++ b/src/main/java/com/njcn/jbsyncdata/service/impl/BusinessServiceImpl.java @@ -1,40 +1,24 @@ package com.njcn.jbsyncdata.service.impl; -import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; -import cn.hutool.core.io.file.FileWriter; -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.text.StrPool; -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; -import com.njcn.influx.utils.InfluxDbUtils; import com.njcn.jbsyncdata.component.TokenComponent; import com.njcn.jbsyncdata.enums.MeasTypeEnum; -import com.njcn.jbsyncdata.pojo.ExcelData; import com.njcn.jbsyncdata.pojo.result.*; import com.njcn.jbsyncdata.service.IBusinessService; import com.njcn.jbsyncdata.service.IPmsPowerGenerationUserService; +import com.njcn.jbsyncdata.util.DataProcessing; import com.njcn.jbsyncdata.util.RestTemplateUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.ListUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.collections4.SetUtils; -import org.apache.commons.lang3.StringUtils; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; -import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.io.File; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; @Slf4j @Service @@ -45,7 +29,7 @@ public class BusinessServiceImpl implements IBusinessService { private TokenComponent tokenComponent; @Resource - private InfluxDbUtils influxDbUtils; + private DataProcessing dataProcessing; @Resource private IPmsPowerGenerationUserService pmsPowerGenerationUserService; @@ -65,6 +49,7 @@ public class BusinessServiceImpl implements IBusinessService { * 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0 */ @Override + @Async("asyncExecutor") public void queryTelemetryData(String date) { DateTime dateTemp = DateUtil.parse(date, DatePattern.NORM_DATE_FORMAT); DateTime beginOfDay = DateUtil.beginOfDay(dateTemp); @@ -97,188 +82,9 @@ public class BusinessServiceImpl implements IBusinessService { List userIds = pmsPowerGenerationUserService.queryAllUserId(); List> singleQueryDataUserId = ListUtils.partition(userIds, 20000); for (int k = 0; k < singleQueryDataUserId.size(); k++) { - //将发电用户编号按100尺寸分片 - List> partitionList = ListUtils.partition(singleQueryDataUserId.get(k), 100); - log.error("总计分了{}片", partitionList.size()); - int count = 0; - tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); - headers.put("x-token", tokenWithRestTemplate.getAccess_token()); - //先获取数据 - List> responseEntities = new ArrayList<>(2000); - int kk = k + 1; - for (List generationUserIDList : partitionList) { - count++; - log.error("查询第{}大片,{}小片数据", kk, count); - //按批次处理用户编号数据 - jsonObjectSub.set("consNos", generationUserIDList); - JSONArray jsonArray = JSONUtil.createArray(); - jsonArray.add(jsonObjectSub); - jsonObject.set("filters", jsonArray); - try { - responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class)); - } catch (Exception exception) { - log.error("远程调用接口异常,异常为:" + exception); - } - } - //开始解析数据 - Set userIdConcatMeasType = new HashSet<>(); - //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType - for (String measType : typeList) { - userIdConcatMeasType.addAll(singleQueryDataUserId.get(k).stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet())); - } - List influxData; - Map> typeData = new HashMap<>(); - StringBuilder tempInfluxData; - ResponseEntity response; - JSONArray statisticsDataList; - JSONObject result; - JSONObject statisticsData; - JSONObject body; - JSONArray records; - String dataIdentify; - JSONObject commonTelemetry; - MeasTypeEnum measTypeEnumByMeasType; - for (int i = 0; i < partitionList.size(); i++) { - log.error("解析第{}片数据", i); - response = responseEntities.get(i); - body = JSONUtil.parseObj(response.getBody()); - if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) { - result = JSONUtil.parseObj(body.get("result", String.class)); - records = JSONUtil.parseArray(result.get("records", String.class)); - log.error("查询遥测数据结束,返回数据量:{}", records.size()); - if (CollectionUtil.isEmpty(records)) { - //日志输出: - log.error("查询时间:{},无遥测数据;", date); - continue; - } - //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 - for (Object obj : records) { // 最多循环100*16次 - commonTelemetry = JSONUtil.parseObj(obj); - dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class)); - if (userIdConcatMeasType.contains(dataIdentify)) { - //首个包含该标识的数据进行处理 - measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class)); - //统计数据,经过测试,接口响应json可能不包含该属性 - statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class); - if (CollectionUtil.isEmpty(statisticsDataList)) { - //添加进有指标但无遥测数据集合 - continue; - } - influxData = new ArrayList<>(); - for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次 - statisticsData = JSONUtil.parseObj(subObj); - tempInfluxData = new StringBuilder(); - tempInfluxData.append(measTypeEnumByMeasType.getPhaseType()) - .append(StrPool.COMMA) - .append(commonTelemetry.get("consNo", String.class)) - .append(StrPool.COMMA) - .append(statisticsData.get("dataTime", String.class)) - .append(StrPool.COMMA) - .append(measTypeEnumByMeasType.getFieldName()) - .append(StrPool.COMMA) - .append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class)); - influxData.add(tempInfluxData.toString()); - } - //userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖; - typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); - //处理完,删除该条记录,减少集合尺寸,提高效率 - userIdConcatMeasType.remove(dataIdentify); - } - } - //没有匹配上的就是该用户没有数据 - log.error("剩余有{}条标识", userIdConcatMeasType.size()); - } else { - log.error("查询遥测数据失败!第{}片,结果为:{}", count, response); - } - } - //最后输出没有数据的用户编号 - /** - * 输出到2个文件,lackData.txt、 excalationData.txt - * 注:用户号去除160前缀 - * 1、所有指标均没有有数据的用户编号 - * 2、部分指标没有数据的用户编号,并表明是哪些指标 - */ - if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) { - Map> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> { - String key = str.substring(3); - key = key.substring(0, key.indexOf(StrPool.AT)); - return key; - })); - //全部缺失数据的用户 - List lackData = new ArrayList<>(); - //部分缺失的用户及指标 - List excalationData = new ArrayList<>(); - Set keyedSet = finalMap.keySet(); - for (String key : keyedSet) { - List data = finalMap.get(key); - if (data.size() == typeList.size()) { - lackData.add(key); - } else { - data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList()); - key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT)); - excalationData.add(key); - } - } - FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/syncData/lackData" + date + k + ".txt")); - lackDataWriter.writeLines(lackData); - FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/syncData/excalationData" + date + k + ".txt")); - excalationDataWriter.writeLines(excalationData); - } - log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size()); - //最后批量入库 - batchInsertData(typeData); + dataProcessing.asyncInfluxDb(tokenComponent,date, restTemplateUtil, typeList, jsonObject, jsonObjectSub, headers, singleQueryDataUserId, k); } } - /** - * 批量入库influxDB - * - * @param typeData 远程根据用户编号获取的数据 Map>> typeData = new HashMap<>(); - */ - private void batchInsertData(Map> typeData) { - log.error("总计有{}条记录入库,以20000作为基数分片插入influxdb", typeData.size()); - List sqlList = new ArrayList<>(); - Set tableNames = typeData.keySet(); - String[] datas; - Map tags; - Map fields; - Point point; - BatchPoints batchPoints; - for (String tableName : tableNames) { - List data = typeData.get(tableName); - tableName = tableName.substring(tableName.lastIndexOf(StrPool.AT) + 1); - for (String datum : data) { - datas = datum.split(StrPool.COMMA); - //tag数据 - tags = new HashMap<>(); - tags.put("phasic_type", datas[0]); - tags.put("line_id", datas[1]); - tags.put("quality_flag", "0"); - tags.put("value_type", "AVG"); - String time = datas[2]; - //tag数据删完后,剩余均是filed数据,因filed属性名不固定,无法指定获取,直接循环 - fields = new HashMap<>(); - fields.put(datas[3], datas[4]); - point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime(), TimeUnit.MILLISECONDS, tags, fields); - batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); - batchPoints.point(point); - sqlList.add(batchPoints.lineProtocol()); - } - } - List> subSqlList = ListUtils.partition(sqlList, 20000); - int count = 1; - for (List sql : subSqlList) { - try { - influxDbUtils.batchInsert(influxDbUtils.getDbName(), "autogen", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql); - } catch (Exception exception) { - log.error("数据批量入库异常,异常为:{}",exception.toString()); - exception.printStackTrace(); - } - log.error("已经入库{}条记录!", count * 20000); - count++; - } - log.error("当前批次所有数据,{}条均已入库!", sqlList.size()); - - } } diff --git a/src/main/java/com/njcn/jbsyncdata/service/impl/DisPhotovoltaicServiceImpl.java b/src/main/java/com/njcn/jbsyncdata/service/impl/DisPhotovoltaicServiceImpl.java index 184c277..cb44d66 100644 --- a/src/main/java/com/njcn/jbsyncdata/service/impl/DisPhotovoltaicServiceImpl.java +++ b/src/main/java/com/njcn/jbsyncdata/service/impl/DisPhotovoltaicServiceImpl.java @@ -74,7 +74,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService { excel.getLineID() ) ) { - excel.setErrorMessage("线路/台区编号不能为空"); + excel.setErrorMessage("台区编号/PMS系统线路编号不能为空"); errorInfo.add(excel); continue; } @@ -82,7 +82,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService { String replace = subString(excel.getCounty()); PmsStatationStat sub = getSub(excel.getPowerSupply() + "_" + replace, oldSubMap); if (ObjectUtil.isNull(sub)) { - excel.setErrorMessage("部门信息不存在"); + excel.setErrorMessage("区县信息不存在"); errorInfo.add(excel); continue; } @@ -129,10 +129,10 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService { info.add(user); } if (CollUtil.isNotEmpty(info)) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 0); - iPmsPowerGenerationUserService.remove(lambdaQueryWrapper); - iPmsPowerGenerationUserService.saveOrUpdateBatch(info, 1000); +// LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); +// lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 0); +// iPmsPowerGenerationUserService.remove(lambdaQueryWrapper); +// iPmsPowerGenerationUserService.saveOrUpdateBatch(info, 1000); } if (CollUtil.isNotEmpty(errorInfo)) { exportExcel(DateUtil.now() + "_10kV错误信息.xlsx", errorInfo,DisPhotovoltaic10Excel.class, response); @@ -173,7 +173,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService { excel.getConnectionDate() ) ) { - excel.setErrorMessage("并网时间/线路/台区编号不能为空"); + excel.setErrorMessage("并网时间/所属线路PMS编号/台区编号不能为空"); errorInfo.add(excel); continue; } @@ -182,7 +182,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService { PmsStatationStat sub = getSub(excel.getPowerSupply() + "_" + replace, oldSubMap); if (ObjectUtil.isNull(sub)) { - excel.setErrorMessage("部门信息不存在"); + excel.setErrorMessage("区县信息不存在"); errorInfo.add(excel); continue; } @@ -229,10 +229,10 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService { info.add(user); } if (CollUtil.isNotEmpty(info)) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 1); - iPmsPowerGenerationUserService.remove(lambdaQueryWrapper); - iPmsPowerGenerationUserService.saveBatch(info, 1000); +// LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); +// lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 1); +// iPmsPowerGenerationUserService.remove(lambdaQueryWrapper); +// iPmsPowerGenerationUserService.saveBatch(info, 1000); } if (CollUtil.isNotEmpty(errorInfo)) { exportExcel(DateUtil.now() + "_380kV错误信息.xlsx", errorInfo,DisPhotovoltaic380Excel.class, response); diff --git a/src/main/java/com/njcn/jbsyncdata/util/DataProcessing.java b/src/main/java/com/njcn/jbsyncdata/util/DataProcessing.java new file mode 100644 index 0000000..2f03329 --- /dev/null +++ b/src/main/java/com/njcn/jbsyncdata/util/DataProcessing.java @@ -0,0 +1,239 @@ +package com.njcn.jbsyncdata.util; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.io.file.FileWriter; +import cn.hutool.core.text.StrPool; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.njcn.influx.utils.InfluxDbUtils; +import com.njcn.jbsyncdata.component.TokenComponent; +import com.njcn.jbsyncdata.enums.MeasTypeEnum; +import com.njcn.jbsyncdata.pojo.result.TokenResult; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.ListUtils; +import org.apache.commons.lang3.StringUtils; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * @author wr + * @description + * @date 2023/10/20 14:14 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class DataProcessing { + + private final InfluxDbUtils influxDbUtils; + + @Async("asyncExecutor") + public void asyncInfluxDb( + TokenComponent tokenComponent, + String date, + RestTemplateUtil restTemplateUtil, + List typeList, + JSONObject jsonObject, + JSONObject jsonObjectSub, + Map headers, + List> singleQueryDataUserId, int k + ) { + TokenResult tokenWithRestTemplate; + //将发电用户编号按100尺寸分片 + List> partitionList = ListUtils.partition(singleQueryDataUserId.get(k), 100); + log.error("总计分了{}片", partitionList.size()); + int count = 0; + tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate(); + headers.put("x-token", tokenWithRestTemplate.getAccess_token()); + //先获取数据 + List> responseEntities = new ArrayList<>(2000); + int kk = k + 1; + for (List generationUserIDList : partitionList) { + count++; + log.error("查询第{}大片,{}小片数据", kk, count); + //按批次处理用户编号数据 + jsonObjectSub.set("consNos", generationUserIDList); + JSONArray jsonArray = JSONUtil.createArray(); + jsonArray.add(jsonObjectSub); + jsonObject.set("filters", jsonArray); + try { + responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class)); + } catch (Exception exception) { + log.error("远程调用接口异常,异常为:" + exception); + } + } + //开始解析数据 + Set userIdConcatMeasType = new HashSet<>(); + //将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType + for (String measType : typeList) { + userIdConcatMeasType.addAll(singleQueryDataUserId.get(k).stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet())); + } + List influxData; + Map> typeData = new HashMap<>(); + StringBuilder tempInfluxData; + ResponseEntity response; + JSONArray statisticsDataList; + JSONObject result; + JSONObject statisticsData; + JSONObject body; + JSONArray records; + String dataIdentify; + JSONObject commonTelemetry; + MeasTypeEnum measTypeEnumByMeasType; + for (int i = 0; i < partitionList.size(); i++) { + log.error("解析第{}片数据", i); + response = responseEntities.get(i); + body = JSONUtil.parseObj(response.getBody()); + if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) { + result = JSONUtil.parseObj(body.get("result", String.class)); + records = JSONUtil.parseArray(result.get("records", String.class)); + log.error("查询遥测数据结束,返回数据量:{}", records.size()); + if (CollectionUtil.isEmpty(records)) { + //日志输出: + log.error("查询时间:{},无遥测数据;", date); + continue; + } + //处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理 + for (Object obj : records) { // 最多循环100*16次 + commonTelemetry = JSONUtil.parseObj(obj); + dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class)); + if (userIdConcatMeasType.contains(dataIdentify)) { + //首个包含该标识的数据进行处理 + measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class)); + //统计数据,经过测试,接口响应json可能不包含该属性 + statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class); + if (CollectionUtil.isEmpty(statisticsDataList)) { + //添加进有指标但无遥测数据集合 + continue; + } + influxData = new ArrayList<>(); + for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次 + statisticsData = JSONUtil.parseObj(subObj); + tempInfluxData = new StringBuilder(); + tempInfluxData.append(measTypeEnumByMeasType.getPhaseType()) + .append(StrPool.COMMA) + .append(commonTelemetry.get("consNo", String.class)) + .append(StrPool.COMMA) + .append(statisticsData.get("dataTime", String.class)) + .append(StrPool.COMMA) + .append(measTypeEnumByMeasType.getFieldName()) + .append(StrPool.COMMA) + .append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class)); + influxData.add(tempInfluxData.toString()); + } + //userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖; + typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData); + //处理完,删除该条记录,减少集合尺寸,提高效率 + userIdConcatMeasType.remove(dataIdentify); + } + } + //没有匹配上的就是该用户没有数据 + log.error("剩余有{}条标识", userIdConcatMeasType.size()); + } else { + log.error("查询遥测数据失败!第{}片,结果为:{}", count, response); + } + } + //最后输出没有数据的用户编号 + /** + * 输出到2个文件,lackData.txt、 excalationData.txt + * 注:用户号去除160前缀 + * 1、所有指标均没有有数据的用户编号 + * 2、部分指标没有数据的用户编号,并表明是哪些指标 + */ + if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) { + Map> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> { + String key = str.substring(3); + key = key.substring(0, key.indexOf(StrPool.AT)); + return key; + })); + //全部缺失数据的用户 + List lackData = new ArrayList<>(); + //部分缺失的用户及指标 + List excalationData = new ArrayList<>(); + Set keyedSet = finalMap.keySet(); + for (String key : keyedSet) { + List data = finalMap.get(key); + if (data.size() == typeList.size()) { + lackData.add(key); + } else { + data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList()); + key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT)); + excalationData.add(key); + } + } + FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/syncData/lackData" + date + k + ".txt")); + lackDataWriter.writeLines(lackData); + FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/syncData/excalationData" + date + k + ".txt")); + excalationDataWriter.writeLines(excalationData); + } + log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size()); + //最后批量入库 + batchInsertData(typeData); + } + + + /** + * 批量入库influxDB + * + * @param typeData 远程根据用户编号获取的数据 Map>> typeData = new HashMap<>(); + */ + private void batchInsertData(Map> typeData) { + log.error("总计有{}条记录入库,以20000作为基数分片插入influxdb", typeData.size()); + List sqlList = new ArrayList<>(); + Set tableNames = typeData.keySet(); + String[] datas; + Map tags; + Map fields; + Point point; + BatchPoints batchPoints; + for (String tableName : tableNames) { + List data = typeData.get(tableName); + tableName = tableName.substring(tableName.lastIndexOf(StrPool.AT) + 1); + for (String datum : data) { + datas = datum.split(StrPool.COMMA); + //tag数据 + tags = new HashMap<>(); + tags.put("phasic_type", datas[0]); + tags.put("line_id", datas[1]); + tags.put("quality_flag", "0"); + tags.put("value_type", "AVG"); + String time = datas[2]; + //tag数据删完后,剩余均是filed数据,因filed属性名不固定,无法指定获取,直接循环 + fields = new HashMap<>(); + fields.put(datas[3], datas[4]); + point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime(), TimeUnit.MILLISECONDS, tags, fields); + batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + sqlList.add(batchPoints.lineProtocol()); + } + } + List> subSqlList = ListUtils.partition(sqlList, 20000); + int count = 1; + for (List sql : subSqlList) { + try { + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "autogen", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql); + } catch (Exception exception) { + log.error("数据批量入库异常,异常为:{}",exception.toString()); + exception.printStackTrace(); + } + log.error("已经入库{}条记录!", count * 20000); + count++; + } + log.error("当前批次所有数据,{}条均已入库!", sqlList.size()); + + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7169fa7..b8ce63f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,5 +1,5 @@ server: - port: 10288 + port: 10299 tomcat: max-swallow-size: 100MB #重要的一行,修改tomcat的吞吐量 spring: @@ -68,4 +68,14 @@ jibei: client_id: bad079495dc111ee987b0a580a080620 client_secret: OxXIgFs9HHI05L3cOg8ogYoFRFs8sKlTJhVocyOprxoWSadcX0we2wffjyTUYGsK grant_type: credentials - url: http://25.42.182.119:32001 \ No newline at end of file + url: http://25.42.182.119:32001 + +#线程池配置信息 +microservice: + ename: async + +threadPool: + corePoolSize: 12 + maxPoolSize: 24 + queueCapacity: 500 + keepAliveSeconds: 60 \ No newline at end of file