From 47d31095b90dbdf7c7e4148e5a84cfa895800a30 Mon Sep 17 00:00:00 2001 From: hongawen <83944980@qq.com> Date: Wed, 17 Jan 2024 09:54:21 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/influx/bo/po/JobDetailInfluxDB.java | 17 +++++ .../service/JobDetailInfluxDBService.java | 5 +- .../impl/JobDetailInfluxDBServiceImpl.java | 6 +- .../impl/OracleToInfluxDBServiceImpl.java | 52 +++++++--------- .../src/main/resources/application.yml | 2 +- .../java/com/njcn/oracle/bo/po/JobDetail.java | 17 +++++ .../njcn/oracle/service/JobDetailService.java | 3 +- .../service/impl/JobDetailServiceImpl.java | 6 +- .../service/impl/OracleServiceImpl.java | 62 ++++++++----------- .../src/main/resources/application.yml | 2 +- 10 files changed, 96 insertions(+), 76 deletions(-) diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailInfluxDB.java b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailInfluxDB.java index a35c7ec..2d62900 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailInfluxDB.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailInfluxDB.java @@ -4,7 +4,9 @@ import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; import com.github.jeffreyning.mybatisplus.anno.MppMultiId; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import java.time.LocalDate; import java.time.LocalDateTime; @@ -20,6 +22,8 @@ import java.util.Objects; */ @TableName(value = "JOB_DETAIL_INFLUXDB") @Data +@NoArgsConstructor +@AllArgsConstructor public class JobDetailInfluxDB { /** * 指标表名 @@ -73,4 +77,17 @@ public class JobDetailInfluxDB { } + public JobDetailInfluxDB(String tableName, LocalDate excuteDate, Integer state, Integer rowCount, LocalDateTime updateTime) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.state = state; + this.rowCount = rowCount; + this.updateTime = updateTime; + } + + public JobDetailInfluxDB(String tableName, LocalDate excuteDate, Integer rowCount) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.rowCount = rowCount; + } } \ No newline at end of file diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailInfluxDBService.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailInfluxDBService.java index 4710c8b..0dca10d 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailInfluxDBService.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailInfluxDBService.java @@ -5,6 +5,8 @@ import com.github.jeffreyning.mybatisplus.service.IMppService; import com.njcn.influx.bo.po.JobDetailInfluxDB; import com.njcn.oracle.bo.param.JobQueryParam; +import java.time.LocalDate; + /** * Description: * Date: 2024/1/8 12:27【需求编号】 @@ -14,9 +16,10 @@ import com.njcn.oracle.bo.param.JobQueryParam; */ public interface JobDetailInfluxDBService extends IMppService { - JobDetailInfluxDB select(JobDetailInfluxDB jobDetail); + JobDetailInfluxDB select(String tableName, LocalDate date); IPage selectByParam(JobQueryParam jobQueryParam); boolean jobRemove(JobDetailInfluxDB jobDetail); + } diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailInfluxDBServiceImpl.java index 3724c8a..166baa1 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailInfluxDBServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailInfluxDBServiceImpl.java @@ -30,10 +30,10 @@ public class JobDetailInfluxDBServiceImpl extends MppServiceImpl dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime()); - dateList.forEach(date -> { + for (LocalDate date : dateList) { + //日志记录 + JobDetailInfluxDB jobDetailInfluxDB = jobDetailInfluxDBService.select(tableName, date); + if (Objects.nonNull(jobDetailInfluxDB) && (jobDetailInfluxDB.getState() == 1 || jobDetailInfluxDB.getState() == 0)) { + //如果该指标当前天已执行或正在执行,直接跳出循环 + break; + } //程序监听 StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -93,11 +99,6 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); - //日志记录 - JobDetailInfluxDB jobDetailInfluxDB = new JobDetailInfluxDB(); - jobDetailInfluxDB.setTableName(tableName); - jobDetailInfluxDB.setExcuteDate(date); - jobDetailInfluxDB = jobDetailInfluxDBService.select(jobDetailInfluxDB); try { if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { //执行目标库的数据处理 @@ -110,7 +111,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { } catch (ClassNotFoundException e) { throw new RuntimeException(e); } - Method method = null; + Method method; try { method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2); } catch (NoSuchMethodException e) { @@ -133,33 +134,19 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { size = list1.size(); //最后一片时修改状态 } + //手动执行GC + System.gc(); if (Objects.nonNull(jobDetailInfluxDB)) { - if (jobDetailInfluxDB.getState() == 2 && i == 0 && size != 0) { - //第一片执行时返现是失败的,则再次执行 + if (i == 0) { jobDetailInfluxDB.setState(0); - jobDetailInfluxDB.setRowCount(size); - jobDetailInfluxDB.setUpdateTime(LocalDateTime.now()); - jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB); - } else if (jobDetailInfluxDB.getState() == 0 && i > 0 && jobDetailInfluxDB.getRowCount() + size != 0) { - // 处理中,后续时间片的处理,累加记录数 - jobDetailInfluxDB.setState(0); - jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size); - jobDetailInfluxDB.setUpdateTime(LocalDateTime.now()); - jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB); - } else { - System.gc(); - break; + jobDetailInfluxDB.setRowCount(0); } + jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size); + jobDetailInfluxDB.setUpdateTime(LocalDateTime.now()); + jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB); } else { - if (size > 0) { - jobDetailInfluxDB = new JobDetailInfluxDB(); - jobDetailInfluxDB.setTableName(tableName); - jobDetailInfluxDB.setExcuteDate(date); - jobDetailInfluxDB.setState(0); - jobDetailInfluxDB.setRowCount(size); - jobDetailInfluxDB.setUpdateTime(LocalDateTime.now()); - jobDetailInfluxDBService.save(jobDetailInfluxDB); - } + jobDetailInfluxDB = new JobDetailInfluxDB(tableName, date, 0, size, LocalDateTime.now()); + jobDetailInfluxDBService.save(jobDetailInfluxDB); } if (i + 1 == slice && Objects.nonNull(jobDetailInfluxDB)) { stopWatch.stop(); @@ -170,6 +157,9 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { } } catch (Exception exception) { exception.printStackTrace(); + if (Objects.isNull(jobDetailInfluxDB)) { + jobDetailInfluxDB = new JobDetailInfluxDB(tableName, date, size); + } jobDetailInfluxDB.setState(2); jobDetailInfluxDB.setUpdateTime(LocalDateTime.now()); jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB); @@ -179,7 +169,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); } - }); + } }); } } diff --git a/influx-data/influx-target/src/main/resources/application.yml b/influx-data/influx-target/src/main/resources/application.yml index c1a8127..a7e99dc 100644 --- a/influx-data/influx-target/src/main/resources/application.yml +++ b/influx-data/influx-target/src/main/resources/application.yml @@ -13,7 +13,7 @@ spring: database: pqsbase mapper-location: com.njcn.influx.imapper application: - name: oracle-data + name: oracle-influx autoconfigure: exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure datasource: diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/JobDetail.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/JobDetail.java index e947d54..e70d81c 100644 --- a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/JobDetail.java +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/JobDetail.java @@ -6,8 +6,11 @@ import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; import com.github.jeffreyning.mybatisplus.anno.MppMultiId; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +import javax.xml.ws.soap.Addressing; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; @@ -23,6 +26,8 @@ import java.util.Objects; */ @TableName(value = "JOB_DETAIL") @Data +@NoArgsConstructor +@AllArgsConstructor public class JobDetail { /** * 指标表名 @@ -75,5 +80,17 @@ public class JobDetail { return Objects.equals(tableName, jobDetail.tableName) && Objects.equals(excuteDate, jobDetail.excuteDate); } + public JobDetail(String tableName, LocalDate excuteDate, Integer state, Integer rowCount, LocalDateTime updateTime) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.state = state; + this.rowCount = rowCount; + this.updateTime = updateTime; + } + public JobDetail(String tableName, LocalDate excuteDate, Integer rowCount) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.rowCount = rowCount; + } } \ No newline at end of file diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/JobDetailService.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/JobDetailService.java index 785122d..7e76fd0 100644 --- a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/JobDetailService.java +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/JobDetailService.java @@ -6,6 +6,7 @@ import com.njcn.oracle.bo.param.JobQueryParam; import com.njcn.oracle.bo.po.JobDetail; import com.baomidou.mybatisplus.extension.service.IService; +import java.time.LocalDate; import java.util.List; /** @@ -17,7 +18,7 @@ import java.util.List; */ public interface JobDetailService extends IMppService { - JobDetail select(JobDetail jobDetail); + JobDetail select(String tableName, LocalDate date); IPage selectByParam(JobQueryParam jobQueryParam); diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/JobDetailServiceImpl.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/JobDetailServiceImpl.java index 0b1e29e..a8d1805 100644 --- a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/JobDetailServiceImpl.java +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/JobDetailServiceImpl.java @@ -36,10 +36,10 @@ public class JobDetailServiceImpl extends MppServiceImpl dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime()); - dateList.forEach(date -> { + for (LocalDate date : dateList) { + //日志记录 + JobDetail jobDetail = jobDetailService.select(tableName, date); + if (Objects.nonNull(jobDetail) && (jobDetail.getState() == 1 || jobDetail.getState() == 0)) { + //如果该指标当前天已执行或正在执行,直接跳出循环 + break; + } //程序监听 StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -84,39 +90,6 @@ public class OracleServiceImpl implements IOracleService { System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); - //日志记录 - JobDetail jobDetail = new JobDetail(); - jobDetail.setTableName(tableName); - jobDetail.setExcuteDate(date); - jobDetail = jobDetailService.select(jobDetail); - if (Objects.nonNull(jobDetail)) { - if (jobDetail.getState() == 2 && i == 0 && size != 0) { - //第一片执行时返现是失败的,则再次执行 - jobDetail.setState(0); - jobDetail.setRowCount(size); - jobDetail.setUpdateTime(LocalDateTime.now()); - jobDetailService.updateByMultiId(jobDetail); - } else if (jobDetail.getState() == 0 && i > 0 && jobDetail.getRowCount() + size != 0) { - // 处理中,后续时间片的处理,累加记录数 - jobDetail.setState(0); - jobDetail.setRowCount(jobDetail.getRowCount() + size); - jobDetail.setUpdateTime(LocalDateTime.now()); - jobDetailService.updateByMultiId(jobDetail); - } else { - System.gc(); - break; - } - } else { - if (size > 0) { - jobDetail = new JobDetail(); - jobDetail.setTableName(tableName); - jobDetail.setExcuteDate(date); - jobDetail.setState(0); - jobDetail.setRowCount(size); - jobDetail.setUpdateTime(LocalDateTime.now()); - jobDetailService.save(jobDetail); - } - } try { if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { //执行目标库的数据处理 @@ -124,6 +97,20 @@ public class OracleServiceImpl implements IOracleService { executor.insertBatchByDB(weakReferenceData.get()); //最后一片时修改状态 } + //手动执行GC + System.gc(); + if (Objects.nonNull(jobDetail)) { + if (i == 0) { + jobDetail.setState(0); + jobDetail.setRowCount(0); + } + jobDetail.setRowCount(jobDetail.getRowCount() + size); + jobDetail.setUpdateTime(LocalDateTime.now()); + jobDetailService.updateByMultiId(jobDetail); + } else { + jobDetail = new JobDetail(tableName, date, 0, size, LocalDateTime.now()); + jobDetailService.save(jobDetail); + } if (i + 1 == slice && Objects.nonNull(jobDetail)) { stopWatch.stop(); jobDetail.setState(1); @@ -133,6 +120,9 @@ public class OracleServiceImpl implements IOracleService { } } catch (Exception exception) { exception.printStackTrace(); + if (Objects.isNull(jobDetail)) { + jobDetail = new JobDetail(tableName, date, size); + } jobDetail.setState(2); jobDetail.setUpdateTime(LocalDateTime.now()); jobDetailService.updateByMultiId(jobDetail); @@ -142,7 +132,9 @@ public class OracleServiceImpl implements IOracleService { System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); } - }); + } + + }); } } diff --git a/oracle-data/oracle-target/src/main/resources/application.yml b/oracle-data/oracle-target/src/main/resources/application.yml index 32fcc3c..2503542 100644 --- a/oracle-data/oracle-target/src/main/resources/application.yml +++ b/oracle-data/oracle-target/src/main/resources/application.yml @@ -2,7 +2,7 @@ server: port: 8091 spring: application: - name: oracle-data + name: oracle-oracle autoconfigure: exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure datasource: