代码调整

This commit is contained in:
2024-01-17 09:54:21 +08:00
parent 661f8d2e06
commit 47d31095b9
10 changed files with 96 additions and 76 deletions

View File

@@ -4,7 +4,9 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import com.github.jeffreyning.mybatisplus.anno.MppMultiId; import com.github.jeffreyning.mybatisplus.anno.MppMultiId;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@@ -20,6 +22,8 @@ import java.util.Objects;
*/ */
@TableName(value = "JOB_DETAIL_INFLUXDB") @TableName(value = "JOB_DETAIL_INFLUXDB")
@Data @Data
@NoArgsConstructor
@AllArgsConstructor
public class JobDetailInfluxDB { public class JobDetailInfluxDB {
/** /**
* 指标表名 * 指标表名
@@ -73,4 +77,17 @@ public class JobDetailInfluxDB {
} }
public JobDetailInfluxDB(String tableName, LocalDate excuteDate, Integer state, Integer rowCount, LocalDateTime updateTime) {
this.tableName = tableName;
this.excuteDate = excuteDate;
this.state = state;
this.rowCount = rowCount;
this.updateTime = updateTime;
}
public JobDetailInfluxDB(String tableName, LocalDate excuteDate, Integer rowCount) {
this.tableName = tableName;
this.excuteDate = excuteDate;
this.rowCount = rowCount;
}
} }

View File

@@ -5,6 +5,8 @@ import com.github.jeffreyning.mybatisplus.service.IMppService;
import com.njcn.influx.bo.po.JobDetailInfluxDB; import com.njcn.influx.bo.po.JobDetailInfluxDB;
import com.njcn.oracle.bo.param.JobQueryParam; import com.njcn.oracle.bo.param.JobQueryParam;
import java.time.LocalDate;
/** /**
* Description: * Description:
* Date: 2024/1/8 12:27【需求编号】 * Date: 2024/1/8 12:27【需求编号】
@@ -14,9 +16,10 @@ import com.njcn.oracle.bo.param.JobQueryParam;
*/ */
public interface JobDetailInfluxDBService extends IMppService<JobDetailInfluxDB> { public interface JobDetailInfluxDBService extends IMppService<JobDetailInfluxDB> {
JobDetailInfluxDB select(JobDetailInfluxDB jobDetail); JobDetailInfluxDB select(String tableName, LocalDate date);
IPage<JobDetailInfluxDB> selectByParam(JobQueryParam jobQueryParam); IPage<JobDetailInfluxDB> selectByParam(JobQueryParam jobQueryParam);
boolean jobRemove(JobDetailInfluxDB jobDetail); boolean jobRemove(JobDetailInfluxDB jobDetail);
} }

View File

@@ -30,10 +30,10 @@ public class JobDetailInfluxDBServiceImpl extends MppServiceImpl<JobDetailInflux
@Override @Override
public JobDetailInfluxDB select(JobDetailInfluxDB jobDetail) { public JobDetailInfluxDB select(String tableName, LocalDate date) {
return this.lambdaQuery() return this.lambdaQuery()
.eq(JobDetailInfluxDB::getTableName, jobDetail.getTableName()) .eq(JobDetailInfluxDB::getTableName, tableName)
.eq(JobDetailInfluxDB::getExcuteDate, jobDetail.getExcuteDate()) .eq(JobDetailInfluxDB::getExcuteDate, date)
.one(); .one();
} }

View File

@@ -70,7 +70,13 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
List<LocalDate> dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime()); List<LocalDate> dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime());
dateList.forEach(date -> { for (LocalDate date : dateList) {
//日志记录
JobDetailInfluxDB jobDetailInfluxDB = jobDetailInfluxDBService.select(tableName, date);
if (Objects.nonNull(jobDetailInfluxDB) && (jobDetailInfluxDB.getState() == 1 || jobDetailInfluxDB.getState() == 0)) {
//如果该指标当前天已执行或正在执行,直接跳出循环
break;
}
//程序监听 //程序监听
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
@@ -93,11 +99,6 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
//日志记录
JobDetailInfluxDB jobDetailInfluxDB = new JobDetailInfluxDB();
jobDetailInfluxDB.setTableName(tableName);
jobDetailInfluxDB.setExcuteDate(date);
jobDetailInfluxDB = jobDetailInfluxDBService.select(jobDetailInfluxDB);
try { try {
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
//执行目标库的数据处理 //执行目标库的数据处理
@@ -110,7 +111,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
Method method = null; Method method;
try { try {
method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2); method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
@@ -133,33 +134,19 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
size = list1.size(); size = list1.size();
//最后一片时修改状态 //最后一片时修改状态
} }
//手动执行GC
System.gc();
if (Objects.nonNull(jobDetailInfluxDB)) { if (Objects.nonNull(jobDetailInfluxDB)) {
if (jobDetailInfluxDB.getState() == 2 && i == 0 && size != 0) { if (i == 0) {
//第一片执行时返现是失败的,则再次执行
jobDetailInfluxDB.setState(0); jobDetailInfluxDB.setState(0);
jobDetailInfluxDB.setRowCount(size); jobDetailInfluxDB.setRowCount(0);
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
} else if (jobDetailInfluxDB.getState() == 0 && i > 0 && jobDetailInfluxDB.getRowCount() + size != 0) {
// 处理中,后续时间片的处理,累加记录数
jobDetailInfluxDB.setState(0);
jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size);
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
} else {
System.gc();
break;
} }
jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size);
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
} else { } else {
if (size > 0) { jobDetailInfluxDB = new JobDetailInfluxDB(tableName, date, 0, size, LocalDateTime.now());
jobDetailInfluxDB = new JobDetailInfluxDB(); jobDetailInfluxDBService.save(jobDetailInfluxDB);
jobDetailInfluxDB.setTableName(tableName);
jobDetailInfluxDB.setExcuteDate(date);
jobDetailInfluxDB.setState(0);
jobDetailInfluxDB.setRowCount(size);
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
jobDetailInfluxDBService.save(jobDetailInfluxDB);
}
} }
if (i + 1 == slice && Objects.nonNull(jobDetailInfluxDB)) { if (i + 1 == slice && Objects.nonNull(jobDetailInfluxDB)) {
stopWatch.stop(); stopWatch.stop();
@@ -170,6 +157,9 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
} }
} catch (Exception exception) { } catch (Exception exception) {
exception.printStackTrace(); exception.printStackTrace();
if (Objects.isNull(jobDetailInfluxDB)) {
jobDetailInfluxDB = new JobDetailInfluxDB(tableName, date, size);
}
jobDetailInfluxDB.setState(2); jobDetailInfluxDB.setState(2);
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now()); jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB); jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
@@ -179,7 +169,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
} }
}); }
}); });
} }
} }

View File

@@ -13,7 +13,7 @@ spring:
database: pqsbase database: pqsbase
mapper-location: com.njcn.influx.imapper mapper-location: com.njcn.influx.imapper
application: application:
name: oracle-data name: oracle-influx
autoconfigure: autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource: datasource:

View File

@@ -6,8 +6,11 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import com.github.jeffreyning.mybatisplus.anno.MppMultiId; import com.github.jeffreyning.mybatisplus.anno.MppMultiId;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import javax.xml.ws.soap.Addressing;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@@ -23,6 +26,8 @@ import java.util.Objects;
*/ */
@TableName(value = "JOB_DETAIL") @TableName(value = "JOB_DETAIL")
@Data @Data
@NoArgsConstructor
@AllArgsConstructor
public class JobDetail { public class JobDetail {
/** /**
* 指标表名 * 指标表名
@@ -75,5 +80,17 @@ public class JobDetail {
return Objects.equals(tableName, jobDetail.tableName) && Objects.equals(excuteDate, jobDetail.excuteDate); return Objects.equals(tableName, jobDetail.tableName) && Objects.equals(excuteDate, jobDetail.excuteDate);
} }
public JobDetail(String tableName, LocalDate excuteDate, Integer state, Integer rowCount, LocalDateTime updateTime) {
this.tableName = tableName;
this.excuteDate = excuteDate;
this.state = state;
this.rowCount = rowCount;
this.updateTime = updateTime;
}
public JobDetail(String tableName, LocalDate excuteDate, Integer rowCount) {
this.tableName = tableName;
this.excuteDate = excuteDate;
this.rowCount = rowCount;
}
} }

View File

@@ -6,6 +6,7 @@ import com.njcn.oracle.bo.param.JobQueryParam;
import com.njcn.oracle.bo.po.JobDetail; import com.njcn.oracle.bo.po.JobDetail;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import java.time.LocalDate;
import java.util.List; import java.util.List;
/** /**
@@ -17,7 +18,7 @@ import java.util.List;
*/ */
public interface JobDetailService extends IMppService<JobDetail> { public interface JobDetailService extends IMppService<JobDetail> {
JobDetail select(JobDetail jobDetail); JobDetail select(String tableName, LocalDate date);
IPage<JobDetail> selectByParam(JobQueryParam jobQueryParam); IPage<JobDetail> selectByParam(JobQueryParam jobQueryParam);

View File

@@ -36,10 +36,10 @@ public class JobDetailServiceImpl extends MppServiceImpl<JobDetailMapper, JobDet
@Override @Override
public JobDetail select(JobDetail jobDetail) { public JobDetail select(String tableName, LocalDate date) {
return this.lambdaQuery() return this.lambdaQuery()
.eq(JobDetail::getTableName, jobDetail.getTableName()) .eq(JobDetail::getTableName, tableName)
.eq(JobDetail::getExcuteDate, jobDetail.getExcuteDate()) .eq(JobDetail::getExcuteDate, date)
.one(); .one();
} }

View File

@@ -61,7 +61,13 @@ public class OracleServiceImpl implements IOracleService {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
List<LocalDate> dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime()); List<LocalDate> dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime());
dateList.forEach(date -> { for (LocalDate date : dateList) {
//日志记录
JobDetail jobDetail = jobDetailService.select(tableName, date);
if (Objects.nonNull(jobDetail) && (jobDetail.getState() == 1 || jobDetail.getState() == 0)) {
//如果该指标当前天已执行或正在执行,直接跳出循环
break;
}
//程序监听 //程序监听
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
@@ -84,39 +90,6 @@ public class OracleServiceImpl implements IOracleService {
System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
//日志记录
JobDetail jobDetail = new JobDetail();
jobDetail.setTableName(tableName);
jobDetail.setExcuteDate(date);
jobDetail = jobDetailService.select(jobDetail);
if (Objects.nonNull(jobDetail)) {
if (jobDetail.getState() == 2 && i == 0 && size != 0) {
//第一片执行时返现是失败的,则再次执行
jobDetail.setState(0);
jobDetail.setRowCount(size);
jobDetail.setUpdateTime(LocalDateTime.now());
jobDetailService.updateByMultiId(jobDetail);
} else if (jobDetail.getState() == 0 && i > 0 && jobDetail.getRowCount() + size != 0) {
// 处理中,后续时间片的处理,累加记录数
jobDetail.setState(0);
jobDetail.setRowCount(jobDetail.getRowCount() + size);
jobDetail.setUpdateTime(LocalDateTime.now());
jobDetailService.updateByMultiId(jobDetail);
} else {
System.gc();
break;
}
} else {
if (size > 0) {
jobDetail = new JobDetail();
jobDetail.setTableName(tableName);
jobDetail.setExcuteDate(date);
jobDetail.setState(0);
jobDetail.setRowCount(size);
jobDetail.setUpdateTime(LocalDateTime.now());
jobDetailService.save(jobDetail);
}
}
try { try {
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
//执行目标库的数据处理 //执行目标库的数据处理
@@ -124,6 +97,20 @@ public class OracleServiceImpl implements IOracleService {
executor.insertBatchByDB(weakReferenceData.get()); executor.insertBatchByDB(weakReferenceData.get());
//最后一片时修改状态 //最后一片时修改状态
} }
//手动执行GC
System.gc();
if (Objects.nonNull(jobDetail)) {
if (i == 0) {
jobDetail.setState(0);
jobDetail.setRowCount(0);
}
jobDetail.setRowCount(jobDetail.getRowCount() + size);
jobDetail.setUpdateTime(LocalDateTime.now());
jobDetailService.updateByMultiId(jobDetail);
} else {
jobDetail = new JobDetail(tableName, date, 0, size, LocalDateTime.now());
jobDetailService.save(jobDetail);
}
if (i + 1 == slice && Objects.nonNull(jobDetail)) { if (i + 1 == slice && Objects.nonNull(jobDetail)) {
stopWatch.stop(); stopWatch.stop();
jobDetail.setState(1); jobDetail.setState(1);
@@ -133,6 +120,9 @@ public class OracleServiceImpl implements IOracleService {
} }
} catch (Exception exception) { } catch (Exception exception) {
exception.printStackTrace(); exception.printStackTrace();
if (Objects.isNull(jobDetail)) {
jobDetail = new JobDetail(tableName, date, size);
}
jobDetail.setState(2); jobDetail.setState(2);
jobDetail.setUpdateTime(LocalDateTime.now()); jobDetail.setUpdateTime(LocalDateTime.now());
jobDetailService.updateByMultiId(jobDetail); jobDetailService.updateByMultiId(jobDetail);
@@ -142,7 +132,9 @@ public class OracleServiceImpl implements IOracleService {
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
} }
}); }
}); });
} }
} }

View File

@@ -2,7 +2,7 @@ server:
port: 8091 port: 8091
spring: spring:
application: application:
name: oracle-data name: oracle-oracle
autoconfigure: autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource: datasource: