1.数据插入改为多线程
2.添加定时任务
This commit is contained in:
@@ -3,9 +3,13 @@ package com.njcn.jbsyncdata;
|
|||||||
import org.mybatis.spring.annotation.MapperScan;
|
import org.mybatis.spring.annotation.MapperScan;
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
@MapperScan("com.njcn.**.mapper")
|
@MapperScan("com.njcn.**.mapper")
|
||||||
@SpringBootApplication(scanBasePackages = "com.njcn")
|
@SpringBootApplication(scanBasePackages = "com.njcn")
|
||||||
|
@EnableScheduling
|
||||||
|
@EnableAsync
|
||||||
public class JbSyncdataApplication {
|
public class JbSyncdataApplication {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|||||||
@@ -0,0 +1,47 @@
|
|||||||
|
package com.njcn.jbsyncdata.config;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author hongawen
|
||||||
|
* @version 1.0.0
|
||||||
|
* @date 2022年03月11日 09:32
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Order(100)
|
||||||
|
@Configuration
|
||||||
|
@EnableAsync
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class AsyncConfiguration {
|
||||||
|
|
||||||
|
private final GeneralInfo generalInfo;
|
||||||
|
|
||||||
|
@Bean("asyncExecutor")
|
||||||
|
public Executor asyncExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
// 核心线程数:线程池创建时候初始化的线程数
|
||||||
|
executor.setCorePoolSize(generalInfo.getCorePoolSize());
|
||||||
|
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
|
||||||
|
executor.setMaxPoolSize(generalInfo.getMaxPoolSize());
|
||||||
|
// 缓冲队列:用来缓冲执行任务的队列
|
||||||
|
executor.setQueueCapacity(generalInfo.getQueueCapacity());
|
||||||
|
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
|
||||||
|
executor.setKeepAliveSeconds(generalInfo.getKeepAliveSeconds());
|
||||||
|
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
|
||||||
|
executor.setThreadNamePrefix(generalInfo.getMicroServiceName());
|
||||||
|
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
33
src/main/java/com/njcn/jbsyncdata/config/GeneralInfo.java
Normal file
33
src/main/java/com/njcn/jbsyncdata/config/GeneralInfo.java
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package com.njcn.jbsyncdata.config;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author hongawen
|
||||||
|
* @version 1.0.0
|
||||||
|
* @date 2021年08月19日 15:56
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Configuration
|
||||||
|
@Order(10)
|
||||||
|
public class GeneralInfo {
|
||||||
|
|
||||||
|
@Value("${microservice.ename}")
|
||||||
|
private String microServiceName;
|
||||||
|
|
||||||
|
@Value("${threadPool.corePoolSize}")
|
||||||
|
private int corePoolSize;
|
||||||
|
|
||||||
|
@Value("${threadPool.maxPoolSize}")
|
||||||
|
private int maxPoolSize;
|
||||||
|
|
||||||
|
@Value("${threadPool.queueCapacity}")
|
||||||
|
private int queueCapacity;
|
||||||
|
|
||||||
|
@Value("${threadPool.keepAliveSeconds}")
|
||||||
|
private int keepAliveSeconds;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,14 +1,10 @@
|
|||||||
package com.njcn.jbsyncdata.controller;
|
package com.njcn.jbsyncdata.controller;
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollectionUtil;
|
|
||||||
import cn.hutool.core.date.DatePattern;
|
|
||||||
import cn.hutool.core.date.DateTime;
|
import cn.hutool.core.date.DateTime;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
import cn.hutool.core.io.file.FileReader;
|
import cn.hutool.core.date.DatePattern;
|
||||||
import cn.hutool.core.text.StrPool;
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.alibaba.excel.EasyExcel;
|
import com.alibaba.excel.EasyExcel;
|
||||||
import com.alibaba.excel.support.ExcelTypeEnum;
|
|
||||||
import com.njcn.jbsyncdata.pojo.DisPhotovoltaic10Excel;
|
import com.njcn.jbsyncdata.pojo.DisPhotovoltaic10Excel;
|
||||||
import com.njcn.jbsyncdata.pojo.DisPhotovoltaic380Excel;
|
import com.njcn.jbsyncdata.pojo.DisPhotovoltaic380Excel;
|
||||||
import com.njcn.jbsyncdata.service.DisPhotovoltaicService;
|
import com.njcn.jbsyncdata.service.DisPhotovoltaicService;
|
||||||
@@ -19,13 +15,13 @@ import io.swagger.annotations.ApiOperation;
|
|||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.web.bind.annotation.PostMapping;
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
import org.springframework.web.multipart.MultipartFile;
|
import org.springframework.web.multipart.MultipartFile;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -124,5 +120,15 @@ public class DisPhotovoltaicController {
|
|||||||
}
|
}
|
||||||
return "数据导入失败";
|
return "数据导入失败";
|
||||||
}
|
}
|
||||||
|
@Scheduled(cron = "0 30 8 * * ?")
|
||||||
|
public void insert() {
|
||||||
|
log.error(Thread.currentThread().getName(),"1.定时器启动----!");
|
||||||
|
DateTime dateTime = DateUtil.offsetDay(new Date(), -1);
|
||||||
|
String s=dateTime.toString();
|
||||||
|
String ds = s.substring(0, s.indexOf(" "));
|
||||||
|
log.error(Thread.currentThread().getName() + "2.定时器执行数据日期 "+ds+"----!");
|
||||||
|
businessService.queryTelemetryData(ds);
|
||||||
|
log.error(Thread.currentThread().getName() + "2.定时器执行数据成功 "+ds+"----!");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,9 @@ public enum MeasTypeEnum {
|
|||||||
TOTW("TotW", "有功", "T","data_harmpower_p","p"),
|
TOTW("TotW", "有功", "T","data_harmpower_p","p"),
|
||||||
|
|
||||||
TOTVAR("TotVar", "无功", "T","data_harmpower_q","q");
|
TOTVAR("TotVar", "无功", "T","data_harmpower_q","q");
|
||||||
|
// A_V3("HphV3_phsA", "A相电压3次谐波值", "A","data_v","v3"),
|
||||||
|
// B_V3("HphV3_phsB", "A相电压3次谐波值", "B","data_v","v3"),
|
||||||
|
// C_V3("Hphv3_phsC", "A相电压3次谐波值", "C","data_v","v3");
|
||||||
|
|
||||||
//冀北指标名称
|
//冀北指标名称
|
||||||
private final String measType;
|
private final String measType;
|
||||||
|
|||||||
@@ -1,40 +1,24 @@
|
|||||||
package com.njcn.jbsyncdata.service.impl;
|
package com.njcn.jbsyncdata.service.impl;
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollectionUtil;
|
|
||||||
import cn.hutool.core.date.DatePattern;
|
import cn.hutool.core.date.DatePattern;
|
||||||
import cn.hutool.core.date.DateTime;
|
import cn.hutool.core.date.DateTime;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
import cn.hutool.core.io.file.FileWriter;
|
|
||||||
import cn.hutool.core.map.MapUtil;
|
|
||||||
import cn.hutool.core.text.StrPool;
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import cn.hutool.json.JSONArray;
|
|
||||||
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONObject;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
import com.njcn.influx.utils.InfluxDbUtils;
|
|
||||||
import com.njcn.jbsyncdata.component.TokenComponent;
|
import com.njcn.jbsyncdata.component.TokenComponent;
|
||||||
import com.njcn.jbsyncdata.enums.MeasTypeEnum;
|
import com.njcn.jbsyncdata.enums.MeasTypeEnum;
|
||||||
import com.njcn.jbsyncdata.pojo.ExcelData;
|
|
||||||
import com.njcn.jbsyncdata.pojo.result.*;
|
import com.njcn.jbsyncdata.pojo.result.*;
|
||||||
import com.njcn.jbsyncdata.service.IBusinessService;
|
import com.njcn.jbsyncdata.service.IBusinessService;
|
||||||
import com.njcn.jbsyncdata.service.IPmsPowerGenerationUserService;
|
import com.njcn.jbsyncdata.service.IPmsPowerGenerationUserService;
|
||||||
|
import com.njcn.jbsyncdata.util.DataProcessing;
|
||||||
import com.njcn.jbsyncdata.util.RestTemplateUtil;
|
import com.njcn.jbsyncdata.util.RestTemplateUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.collections4.ListUtils;
|
import org.apache.commons.collections4.ListUtils;
|
||||||
import org.apache.commons.collections4.MapUtils;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.apache.commons.collections4.SetUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.influxdb.InfluxDB;
|
|
||||||
import org.influxdb.dto.BatchPoints;
|
|
||||||
import org.influxdb.dto.Point;
|
|
||||||
import org.springframework.http.ResponseEntity;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.io.File;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@@ -45,7 +29,7 @@ public class BusinessServiceImpl implements IBusinessService {
|
|||||||
private TokenComponent tokenComponent;
|
private TokenComponent tokenComponent;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private InfluxDbUtils influxDbUtils;
|
private DataProcessing dataProcessing;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private IPmsPowerGenerationUserService pmsPowerGenerationUserService;
|
private IPmsPowerGenerationUserService pmsPowerGenerationUserService;
|
||||||
@@ -65,6 +49,7 @@ public class BusinessServiceImpl implements IBusinessService {
|
|||||||
* 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0
|
* 4. StatisticsData统计数据的实际数值measValue为null-----对应时间、指标的数值设置为0
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@Async("asyncExecutor")
|
||||||
public void queryTelemetryData(String date) {
|
public void queryTelemetryData(String date) {
|
||||||
DateTime dateTemp = DateUtil.parse(date, DatePattern.NORM_DATE_FORMAT);
|
DateTime dateTemp = DateUtil.parse(date, DatePattern.NORM_DATE_FORMAT);
|
||||||
DateTime beginOfDay = DateUtil.beginOfDay(dateTemp);
|
DateTime beginOfDay = DateUtil.beginOfDay(dateTemp);
|
||||||
@@ -97,188 +82,9 @@ public class BusinessServiceImpl implements IBusinessService {
|
|||||||
List<String> userIds = pmsPowerGenerationUserService.queryAllUserId();
|
List<String> userIds = pmsPowerGenerationUserService.queryAllUserId();
|
||||||
List<List<String>> singleQueryDataUserId = ListUtils.partition(userIds, 20000);
|
List<List<String>> singleQueryDataUserId = ListUtils.partition(userIds, 20000);
|
||||||
for (int k = 0; k < singleQueryDataUserId.size(); k++) {
|
for (int k = 0; k < singleQueryDataUserId.size(); k++) {
|
||||||
//将发电用户编号按100尺寸分片
|
dataProcessing.asyncInfluxDb(tokenComponent,date, restTemplateUtil, typeList, jsonObject, jsonObjectSub, headers, singleQueryDataUserId, k);
|
||||||
List<List<String>> partitionList = ListUtils.partition(singleQueryDataUserId.get(k), 100);
|
|
||||||
log.error("总计分了{}片", partitionList.size());
|
|
||||||
int count = 0;
|
|
||||||
tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate();
|
|
||||||
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
|
|
||||||
//先获取数据
|
|
||||||
List<ResponseEntity<String>> responseEntities = new ArrayList<>(2000);
|
|
||||||
int kk = k + 1;
|
|
||||||
for (List<String> generationUserIDList : partitionList) {
|
|
||||||
count++;
|
|
||||||
log.error("查询第{}大片,{}小片数据", kk, count);
|
|
||||||
//按批次处理用户编号数据
|
|
||||||
jsonObjectSub.set("consNos", generationUserIDList);
|
|
||||||
JSONArray jsonArray = JSONUtil.createArray();
|
|
||||||
jsonArray.add(jsonObjectSub);
|
|
||||||
jsonObject.set("filters", jsonArray);
|
|
||||||
try {
|
|
||||||
responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class));
|
|
||||||
} catch (Exception exception) {
|
|
||||||
log.error("远程调用接口异常,异常为:" + exception);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//开始解析数据
|
|
||||||
Set<String> userIdConcatMeasType = new HashSet<>();
|
|
||||||
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
|
|
||||||
for (String measType : typeList) {
|
|
||||||
userIdConcatMeasType.addAll(singleQueryDataUserId.get(k).stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet()));
|
|
||||||
}
|
|
||||||
List</*各值以逗号分隔*/String> influxData;
|
|
||||||
Map</*表名*/String, List</*各值以逗号分隔*/String>> typeData = new HashMap<>();
|
|
||||||
StringBuilder tempInfluxData;
|
|
||||||
ResponseEntity<String> response;
|
|
||||||
JSONArray statisticsDataList;
|
|
||||||
JSONObject result;
|
|
||||||
JSONObject statisticsData;
|
|
||||||
JSONObject body;
|
|
||||||
JSONArray records;
|
|
||||||
String dataIdentify;
|
|
||||||
JSONObject commonTelemetry;
|
|
||||||
MeasTypeEnum measTypeEnumByMeasType;
|
|
||||||
for (int i = 0; i < partitionList.size(); i++) {
|
|
||||||
log.error("解析第{}片数据", i);
|
|
||||||
response = responseEntities.get(i);
|
|
||||||
body = JSONUtil.parseObj(response.getBody());
|
|
||||||
if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) {
|
|
||||||
result = JSONUtil.parseObj(body.get("result", String.class));
|
|
||||||
records = JSONUtil.parseArray(result.get("records", String.class));
|
|
||||||
log.error("查询遥测数据结束,返回数据量:{}", records.size());
|
|
||||||
if (CollectionUtil.isEmpty(records)) {
|
|
||||||
//日志输出:
|
|
||||||
log.error("查询时间:{},无遥测数据;", date);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
//处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理
|
|
||||||
for (Object obj : records) { // 最多循环100*16次
|
|
||||||
commonTelemetry = JSONUtil.parseObj(obj);
|
|
||||||
dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class));
|
|
||||||
if (userIdConcatMeasType.contains(dataIdentify)) {
|
|
||||||
//首个包含该标识的数据进行处理
|
|
||||||
measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class));
|
|
||||||
//统计数据,经过测试,接口响应json可能不包含该属性
|
|
||||||
statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class);
|
|
||||||
if (CollectionUtil.isEmpty(statisticsDataList)) {
|
|
||||||
//添加进有指标但无遥测数据集合
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
influxData = new ArrayList<>();
|
|
||||||
for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次
|
|
||||||
statisticsData = JSONUtil.parseObj(subObj);
|
|
||||||
tempInfluxData = new StringBuilder();
|
|
||||||
tempInfluxData.append(measTypeEnumByMeasType.getPhaseType())
|
|
||||||
.append(StrPool.COMMA)
|
|
||||||
.append(commonTelemetry.get("consNo", String.class))
|
|
||||||
.append(StrPool.COMMA)
|
|
||||||
.append(statisticsData.get("dataTime", String.class))
|
|
||||||
.append(StrPool.COMMA)
|
|
||||||
.append(measTypeEnumByMeasType.getFieldName())
|
|
||||||
.append(StrPool.COMMA)
|
|
||||||
.append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class));
|
|
||||||
influxData.add(tempInfluxData.toString());
|
|
||||||
}
|
|
||||||
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
|
|
||||||
typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData);
|
|
||||||
//处理完,删除该条记录,减少集合尺寸,提高效率
|
|
||||||
userIdConcatMeasType.remove(dataIdentify);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//没有匹配上的就是该用户没有数据
|
|
||||||
log.error("剩余有{}条标识", userIdConcatMeasType.size());
|
|
||||||
} else {
|
|
||||||
log.error("查询遥测数据失败!第{}片,结果为:{}", count, response);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//最后输出没有数据的用户编号
|
|
||||||
/**
|
|
||||||
* 输出到2个文件,lackData.txt、 excalationData.txt
|
|
||||||
* 注:用户号去除160前缀
|
|
||||||
* 1、所有指标均没有有数据的用户编号
|
|
||||||
* 2、部分指标没有数据的用户编号,并表明是哪些指标
|
|
||||||
*/
|
|
||||||
if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) {
|
|
||||||
Map<String, List<String>> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> {
|
|
||||||
String key = str.substring(3);
|
|
||||||
key = key.substring(0, key.indexOf(StrPool.AT));
|
|
||||||
return key;
|
|
||||||
}));
|
|
||||||
//全部缺失数据的用户
|
|
||||||
List<String> lackData = new ArrayList<>();
|
|
||||||
//部分缺失的用户及指标
|
|
||||||
List<String> excalationData = new ArrayList<>();
|
|
||||||
Set<String> keyedSet = finalMap.keySet();
|
|
||||||
for (String key : keyedSet) {
|
|
||||||
List<String> data = finalMap.get(key);
|
|
||||||
if (data.size() == typeList.size()) {
|
|
||||||
lackData.add(key);
|
|
||||||
} else {
|
|
||||||
data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList());
|
|
||||||
key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT));
|
|
||||||
excalationData.add(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/syncData/lackData" + date + k + ".txt"));
|
|
||||||
lackDataWriter.writeLines(lackData);
|
|
||||||
FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/syncData/excalationData" + date + k + ".txt"));
|
|
||||||
excalationDataWriter.writeLines(excalationData);
|
|
||||||
}
|
|
||||||
log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size());
|
|
||||||
//最后批量入库
|
|
||||||
batchInsertData(typeData);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批量入库influxDB
|
|
||||||
*
|
|
||||||
* @param typeData 远程根据用户编号获取的数据 Map</表名/String, List<Map</属性名/String,/数值/String>>> typeData = new HashMap<>();
|
|
||||||
*/
|
|
||||||
private void batchInsertData(Map<String, List<String>> typeData) {
|
|
||||||
log.error("总计有{}条记录入库,以20000作为基数分片插入influxdb", typeData.size());
|
|
||||||
List<String> sqlList = new ArrayList<>();
|
|
||||||
Set<String> tableNames = typeData.keySet();
|
|
||||||
String[] datas;
|
|
||||||
Map<String, String> tags;
|
|
||||||
Map<String, Object> fields;
|
|
||||||
Point point;
|
|
||||||
BatchPoints batchPoints;
|
|
||||||
for (String tableName : tableNames) {
|
|
||||||
List<String> data = typeData.get(tableName);
|
|
||||||
tableName = tableName.substring(tableName.lastIndexOf(StrPool.AT) + 1);
|
|
||||||
for (String datum : data) {
|
|
||||||
datas = datum.split(StrPool.COMMA);
|
|
||||||
//tag数据
|
|
||||||
tags = new HashMap<>();
|
|
||||||
tags.put("phasic_type", datas[0]);
|
|
||||||
tags.put("line_id", datas[1]);
|
|
||||||
tags.put("quality_flag", "0");
|
|
||||||
tags.put("value_type", "AVG");
|
|
||||||
String time = datas[2];
|
|
||||||
//tag数据删完后,剩余均是filed数据,因filed属性名不固定,无法指定获取,直接循环
|
|
||||||
fields = new HashMap<>();
|
|
||||||
fields.put(datas[3], datas[4]);
|
|
||||||
point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime(), TimeUnit.MILLISECONDS, tags, fields);
|
|
||||||
batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
|
|
||||||
batchPoints.point(point);
|
|
||||||
sqlList.add(batchPoints.lineProtocol());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
List<List<String>> subSqlList = ListUtils.partition(sqlList, 20000);
|
|
||||||
int count = 1;
|
|
||||||
for (List<String> sql : subSqlList) {
|
|
||||||
try {
|
|
||||||
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "autogen", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql);
|
|
||||||
} catch (Exception exception) {
|
|
||||||
log.error("数据批量入库异常,异常为:{}",exception.toString());
|
|
||||||
exception.printStackTrace();
|
|
||||||
}
|
|
||||||
log.error("已经入库{}条记录!", count * 20000);
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
log.error("当前批次所有数据,{}条均已入库!", sqlList.size());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService {
|
|||||||
excel.getLineID()
|
excel.getLineID()
|
||||||
)
|
)
|
||||||
) {
|
) {
|
||||||
excel.setErrorMessage("线路/台区编号不能为空");
|
excel.setErrorMessage("台区编号/PMS系统线路编号不能为空");
|
||||||
errorInfo.add(excel);
|
errorInfo.add(excel);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -82,7 +82,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService {
|
|||||||
String replace = subString(excel.getCounty());
|
String replace = subString(excel.getCounty());
|
||||||
PmsStatationStat sub = getSub(excel.getPowerSupply() + "_" + replace, oldSubMap);
|
PmsStatationStat sub = getSub(excel.getPowerSupply() + "_" + replace, oldSubMap);
|
||||||
if (ObjectUtil.isNull(sub)) {
|
if (ObjectUtil.isNull(sub)) {
|
||||||
excel.setErrorMessage("部门信息不存在");
|
excel.setErrorMessage("区县信息不存在");
|
||||||
errorInfo.add(excel);
|
errorInfo.add(excel);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -129,10 +129,10 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService {
|
|||||||
info.add(user);
|
info.add(user);
|
||||||
}
|
}
|
||||||
if (CollUtil.isNotEmpty(info)) {
|
if (CollUtil.isNotEmpty(info)) {
|
||||||
LambdaQueryWrapper<PmsPowerGenerationUser> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
// LambdaQueryWrapper<PmsPowerGenerationUser> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||||
lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 0);
|
// lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 0);
|
||||||
iPmsPowerGenerationUserService.remove(lambdaQueryWrapper);
|
// iPmsPowerGenerationUserService.remove(lambdaQueryWrapper);
|
||||||
iPmsPowerGenerationUserService.saveOrUpdateBatch(info, 1000);
|
// iPmsPowerGenerationUserService.saveOrUpdateBatch(info, 1000);
|
||||||
}
|
}
|
||||||
if (CollUtil.isNotEmpty(errorInfo)) {
|
if (CollUtil.isNotEmpty(errorInfo)) {
|
||||||
exportExcel(DateUtil.now() + "_10kV错误信息.xlsx", errorInfo,DisPhotovoltaic10Excel.class, response);
|
exportExcel(DateUtil.now() + "_10kV错误信息.xlsx", errorInfo,DisPhotovoltaic10Excel.class, response);
|
||||||
@@ -173,7 +173,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService {
|
|||||||
excel.getConnectionDate()
|
excel.getConnectionDate()
|
||||||
)
|
)
|
||||||
) {
|
) {
|
||||||
excel.setErrorMessage("并网时间/线路/台区编号不能为空");
|
excel.setErrorMessage("并网时间/所属线路PMS编号/台区编号不能为空");
|
||||||
errorInfo.add(excel);
|
errorInfo.add(excel);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -182,7 +182,7 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService {
|
|||||||
|
|
||||||
PmsStatationStat sub = getSub(excel.getPowerSupply() + "_" + replace, oldSubMap);
|
PmsStatationStat sub = getSub(excel.getPowerSupply() + "_" + replace, oldSubMap);
|
||||||
if (ObjectUtil.isNull(sub)) {
|
if (ObjectUtil.isNull(sub)) {
|
||||||
excel.setErrorMessage("部门信息不存在");
|
excel.setErrorMessage("区县信息不存在");
|
||||||
errorInfo.add(excel);
|
errorInfo.add(excel);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -229,10 +229,10 @@ public class DisPhotovoltaicServiceImpl implements DisPhotovoltaicService {
|
|||||||
info.add(user);
|
info.add(user);
|
||||||
}
|
}
|
||||||
if (CollUtil.isNotEmpty(info)) {
|
if (CollUtil.isNotEmpty(info)) {
|
||||||
LambdaQueryWrapper<PmsPowerGenerationUser> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
// LambdaQueryWrapper<PmsPowerGenerationUser> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||||
lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 1);
|
// lambdaQueryWrapper.eq(PmsPowerGenerationUser::getInputStatus, 1);
|
||||||
iPmsPowerGenerationUserService.remove(lambdaQueryWrapper);
|
// iPmsPowerGenerationUserService.remove(lambdaQueryWrapper);
|
||||||
iPmsPowerGenerationUserService.saveBatch(info, 1000);
|
// iPmsPowerGenerationUserService.saveBatch(info, 1000);
|
||||||
}
|
}
|
||||||
if (CollUtil.isNotEmpty(errorInfo)) {
|
if (CollUtil.isNotEmpty(errorInfo)) {
|
||||||
exportExcel(DateUtil.now() + "_380kV错误信息.xlsx", errorInfo,DisPhotovoltaic380Excel.class, response);
|
exportExcel(DateUtil.now() + "_380kV错误信息.xlsx", errorInfo,DisPhotovoltaic380Excel.class, response);
|
||||||
|
|||||||
239
src/main/java/com/njcn/jbsyncdata/util/DataProcessing.java
Normal file
239
src/main/java/com/njcn/jbsyncdata/util/DataProcessing.java
Normal file
@@ -0,0 +1,239 @@
|
|||||||
|
package com.njcn.jbsyncdata.util;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import cn.hutool.core.date.DatePattern;
|
||||||
|
import cn.hutool.core.date.DateUtil;
|
||||||
|
import cn.hutool.core.io.file.FileWriter;
|
||||||
|
import cn.hutool.core.text.StrPool;
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import cn.hutool.json.JSONArray;
|
||||||
|
import cn.hutool.json.JSONObject;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.njcn.influx.utils.InfluxDbUtils;
|
||||||
|
import com.njcn.jbsyncdata.component.TokenComponent;
|
||||||
|
import com.njcn.jbsyncdata.enums.MeasTypeEnum;
|
||||||
|
import com.njcn.jbsyncdata.pojo.result.TokenResult;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.collections4.ListUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.influxdb.InfluxDB;
|
||||||
|
import org.influxdb.dto.BatchPoints;
|
||||||
|
import org.influxdb.dto.Point;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author wr
|
||||||
|
* @description
|
||||||
|
* @date 2023/10/20 14:14
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class DataProcessing {
|
||||||
|
|
||||||
|
private final InfluxDbUtils influxDbUtils;
|
||||||
|
|
||||||
|
@Async("asyncExecutor")
|
||||||
|
public void asyncInfluxDb(
|
||||||
|
TokenComponent tokenComponent,
|
||||||
|
String date,
|
||||||
|
RestTemplateUtil restTemplateUtil,
|
||||||
|
List<String> typeList,
|
||||||
|
JSONObject jsonObject,
|
||||||
|
JSONObject jsonObjectSub,
|
||||||
|
Map<String, String> headers,
|
||||||
|
List<List<String>> singleQueryDataUserId, int k
|
||||||
|
) {
|
||||||
|
TokenResult tokenWithRestTemplate;
|
||||||
|
//将发电用户编号按100尺寸分片
|
||||||
|
List<List<String>> partitionList = ListUtils.partition(singleQueryDataUserId.get(k), 100);
|
||||||
|
log.error("总计分了{}片", partitionList.size());
|
||||||
|
int count = 0;
|
||||||
|
tokenWithRestTemplate = tokenComponent.getTokenWithRestTemplate();
|
||||||
|
headers.put("x-token", tokenWithRestTemplate.getAccess_token());
|
||||||
|
//先获取数据
|
||||||
|
List<ResponseEntity<String>> responseEntities = new ArrayList<>(2000);
|
||||||
|
int kk = k + 1;
|
||||||
|
for (List<String> generationUserIDList : partitionList) {
|
||||||
|
count++;
|
||||||
|
log.error("查询第{}大片,{}小片数据", kk, count);
|
||||||
|
//按批次处理用户编号数据
|
||||||
|
jsonObjectSub.set("consNos", generationUserIDList);
|
||||||
|
JSONArray jsonArray = JSONUtil.createArray();
|
||||||
|
jsonArray.add(jsonObjectSub);
|
||||||
|
jsonObject.set("filters", jsonArray);
|
||||||
|
try {
|
||||||
|
responseEntities.add(restTemplateUtil.post(tokenComponent.getUrl().concat("/realMeasCenter/telemetry/commonQuery"), headers, jsonObject, String.class));
|
||||||
|
} catch (Exception exception) {
|
||||||
|
log.error("远程调用接口异常,异常为:" + exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//开始解析数据
|
||||||
|
Set<String> userIdConcatMeasType = new HashSet<>();
|
||||||
|
//将指标+客户编号组合起来匹配返回数据的第一条记录:userId@measType
|
||||||
|
for (String measType : typeList) {
|
||||||
|
userIdConcatMeasType.addAll(singleQueryDataUserId.get(k).stream().map(t -> t.concat(StrPool.AT).concat(measType)).collect(Collectors.toSet()));
|
||||||
|
}
|
||||||
|
List</*各值以逗号分隔*/String> influxData;
|
||||||
|
Map</*表名*/String, List</*各值以逗号分隔*/String>> typeData = new HashMap<>();
|
||||||
|
StringBuilder tempInfluxData;
|
||||||
|
ResponseEntity<String> response;
|
||||||
|
JSONArray statisticsDataList;
|
||||||
|
JSONObject result;
|
||||||
|
JSONObject statisticsData;
|
||||||
|
JSONObject body;
|
||||||
|
JSONArray records;
|
||||||
|
String dataIdentify;
|
||||||
|
JSONObject commonTelemetry;
|
||||||
|
MeasTypeEnum measTypeEnumByMeasType;
|
||||||
|
for (int i = 0; i < partitionList.size(); i++) {
|
||||||
|
log.error("解析第{}片数据", i);
|
||||||
|
response = responseEntities.get(i);
|
||||||
|
body = JSONUtil.parseObj(response.getBody());
|
||||||
|
if (response.getStatusCodeValue() == 200 && body.get("status", String.class).equalsIgnoreCase("000000")) {
|
||||||
|
result = JSONUtil.parseObj(body.get("result", String.class));
|
||||||
|
records = JSONUtil.parseArray(result.get("records", String.class));
|
||||||
|
log.error("查询遥测数据结束,返回数据量:{}", records.size());
|
||||||
|
if (CollectionUtil.isEmpty(records)) {
|
||||||
|
//日志输出:
|
||||||
|
log.error("查询时间:{},无遥测数据;", date);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
//处理各个record的数据,因用户下可能有多个测量点,按指标循环,默认采用第一个匹配上的做数据处理
|
||||||
|
for (Object obj : records) { // 最多循环100*16次
|
||||||
|
commonTelemetry = JSONUtil.parseObj(obj);
|
||||||
|
dataIdentify = commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(commonTelemetry.get("measTypeCode", String.class));
|
||||||
|
if (userIdConcatMeasType.contains(dataIdentify)) {
|
||||||
|
//首个包含该标识的数据进行处理
|
||||||
|
measTypeEnumByMeasType = MeasTypeEnum.getMeasTypeEnumByMeasType(commonTelemetry.get("measTypeCode", String.class));
|
||||||
|
//统计数据,经过测试,接口响应json可能不包含该属性
|
||||||
|
statisticsDataList = commonTelemetry.get("telemetryValue", JSONArray.class);
|
||||||
|
if (CollectionUtil.isEmpty(statisticsDataList)) {
|
||||||
|
//添加进有指标但无遥测数据集合
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
influxData = new ArrayList<>();
|
||||||
|
for (Object subObj : statisticsDataList) { // 匹配上进入,循环96次
|
||||||
|
statisticsData = JSONUtil.parseObj(subObj);
|
||||||
|
tempInfluxData = new StringBuilder();
|
||||||
|
tempInfluxData.append(measTypeEnumByMeasType.getPhaseType())
|
||||||
|
.append(StrPool.COMMA)
|
||||||
|
.append(commonTelemetry.get("consNo", String.class))
|
||||||
|
.append(StrPool.COMMA)
|
||||||
|
.append(statisticsData.get("dataTime", String.class))
|
||||||
|
.append(StrPool.COMMA)
|
||||||
|
.append(measTypeEnumByMeasType.getFieldName())
|
||||||
|
.append(StrPool.COMMA)
|
||||||
|
.append(StrUtil.isBlank(statisticsData.get("measValue", String.class)) ? "0" : statisticsData.get("measValue", String.class));
|
||||||
|
influxData.add(tempInfluxData.toString());
|
||||||
|
}
|
||||||
|
//userId@measType@tableName:存在多个指标存储表名一致,避免数据覆盖;
|
||||||
|
typeData.put(commonTelemetry.get("consNo", String.class).concat(StrPool.AT).concat(measTypeEnumByMeasType.getMeasType()).concat(StrPool.AT).concat(measTypeEnumByMeasType.getTableName()), influxData);
|
||||||
|
//处理完,删除该条记录,减少集合尺寸,提高效率
|
||||||
|
userIdConcatMeasType.remove(dataIdentify);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//没有匹配上的就是该用户没有数据
|
||||||
|
log.error("剩余有{}条标识", userIdConcatMeasType.size());
|
||||||
|
} else {
|
||||||
|
log.error("查询遥测数据失败!第{}片,结果为:{}", count, response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//最后输出没有数据的用户编号
|
||||||
|
/**
|
||||||
|
* 输出到2个文件,lackData.txt、 excalationData.txt
|
||||||
|
* 注:用户号去除160前缀
|
||||||
|
* 1、所有指标均没有有数据的用户编号
|
||||||
|
* 2、部分指标没有数据的用户编号,并表明是哪些指标
|
||||||
|
*/
|
||||||
|
if (CollectionUtil.isNotEmpty(userIdConcatMeasType)) {
|
||||||
|
Map<String, List<String>> finalMap = userIdConcatMeasType.stream().collect(Collectors.groupingBy(str -> {
|
||||||
|
String key = str.substring(3);
|
||||||
|
key = key.substring(0, key.indexOf(StrPool.AT));
|
||||||
|
return key;
|
||||||
|
}));
|
||||||
|
//全部缺失数据的用户
|
||||||
|
List<String> lackData = new ArrayList<>();
|
||||||
|
//部分缺失的用户及指标
|
||||||
|
List<String> excalationData = new ArrayList<>();
|
||||||
|
Set<String> keyedSet = finalMap.keySet();
|
||||||
|
for (String key : keyedSet) {
|
||||||
|
List<String> data = finalMap.get(key);
|
||||||
|
if (data.size() == typeList.size()) {
|
||||||
|
lackData.add(key);
|
||||||
|
} else {
|
||||||
|
data = data.stream().map(t -> t.substring(t.indexOf(StrPool.AT) + 1)).collect(Collectors.toList());
|
||||||
|
key = key.concat(StrPool.COMMA).concat(StringUtils.join(data, StrPool.AT));
|
||||||
|
excalationData.add(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FileWriter lackDataWriter = FileWriter.create(new File("/usr/local/syncData/lackData" + date + k + ".txt"));
|
||||||
|
lackDataWriter.writeLines(lackData);
|
||||||
|
FileWriter excalationDataWriter = FileWriter.create(new File("/usr/local/syncData/excalationData" + date + k + ".txt"));
|
||||||
|
excalationDataWriter.writeLines(excalationData);
|
||||||
|
}
|
||||||
|
log.error("用户有指标没有数据的长度为:{}", userIdConcatMeasType.size());
|
||||||
|
//最后批量入库
|
||||||
|
batchInsertData(typeData);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量入库influxDB
|
||||||
|
*
|
||||||
|
* @param typeData 远程根据用户编号获取的数据 Map</表名/String, List<Map</属性名/String,/数值/String>>> typeData = new HashMap<>();
|
||||||
|
*/
|
||||||
|
private void batchInsertData(Map<String, List<String>> typeData) {
|
||||||
|
log.error("总计有{}条记录入库,以20000作为基数分片插入influxdb", typeData.size());
|
||||||
|
List<String> sqlList = new ArrayList<>();
|
||||||
|
Set<String> tableNames = typeData.keySet();
|
||||||
|
String[] datas;
|
||||||
|
Map<String, String> tags;
|
||||||
|
Map<String, Object> fields;
|
||||||
|
Point point;
|
||||||
|
BatchPoints batchPoints;
|
||||||
|
for (String tableName : tableNames) {
|
||||||
|
List<String> data = typeData.get(tableName);
|
||||||
|
tableName = tableName.substring(tableName.lastIndexOf(StrPool.AT) + 1);
|
||||||
|
for (String datum : data) {
|
||||||
|
datas = datum.split(StrPool.COMMA);
|
||||||
|
//tag数据
|
||||||
|
tags = new HashMap<>();
|
||||||
|
tags.put("phasic_type", datas[0]);
|
||||||
|
tags.put("line_id", datas[1]);
|
||||||
|
tags.put("quality_flag", "0");
|
||||||
|
tags.put("value_type", "AVG");
|
||||||
|
String time = datas[2];
|
||||||
|
//tag数据删完后,剩余均是filed数据,因filed属性名不固定,无法指定获取,直接循环
|
||||||
|
fields = new HashMap<>();
|
||||||
|
fields.put(datas[3], datas[4]);
|
||||||
|
point = influxDbUtils.pointBuilder(tableName, DateUtil.parse(time, DatePattern.NORM_DATETIME_FORMATTER).getTime(), TimeUnit.MILLISECONDS, tags, fields);
|
||||||
|
batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
|
||||||
|
batchPoints.point(point);
|
||||||
|
sqlList.add(batchPoints.lineProtocol());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
List<List<String>> subSqlList = ListUtils.partition(sqlList, 20000);
|
||||||
|
int count = 1;
|
||||||
|
for (List<String> sql : subSqlList) {
|
||||||
|
try {
|
||||||
|
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "autogen", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, sql);
|
||||||
|
} catch (Exception exception) {
|
||||||
|
log.error("数据批量入库异常,异常为:{}",exception.toString());
|
||||||
|
exception.printStackTrace();
|
||||||
|
}
|
||||||
|
log.error("已经入库{}条记录!", count * 20000);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
log.error("当前批次所有数据,{}条均已入库!", sqlList.size());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
server:
|
server:
|
||||||
port: 10288
|
port: 10299
|
||||||
tomcat:
|
tomcat:
|
||||||
max-swallow-size: 100MB #重要的一行,修改tomcat的吞吐量
|
max-swallow-size: 100MB #重要的一行,修改tomcat的吞吐量
|
||||||
spring:
|
spring:
|
||||||
@@ -68,4 +68,14 @@ jibei:
|
|||||||
client_id: bad079495dc111ee987b0a580a080620
|
client_id: bad079495dc111ee987b0a580a080620
|
||||||
client_secret: OxXIgFs9HHI05L3cOg8ogYoFRFs8sKlTJhVocyOprxoWSadcX0we2wffjyTUYGsK
|
client_secret: OxXIgFs9HHI05L3cOg8ogYoFRFs8sKlTJhVocyOprxoWSadcX0we2wffjyTUYGsK
|
||||||
grant_type: credentials
|
grant_type: credentials
|
||||||
url: http://25.42.182.119:32001
|
url: http://25.42.182.119:32001
|
||||||
|
|
||||||
|
#线程池配置信息
|
||||||
|
microservice:
|
||||||
|
ename: async
|
||||||
|
|
||||||
|
threadPool:
|
||||||
|
corePoolSize: 12
|
||||||
|
maxPoolSize: 24
|
||||||
|
queueCapacity: 500
|
||||||
|
keepAliveSeconds: 60
|
||||||
Reference in New Issue
Block a user