diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/param/DataAsynParam.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/param/DataAsynParam.java index 659fac7..7ccba47 100644 --- a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/param/DataAsynParam.java +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/param/DataAsynParam.java @@ -10,6 +10,13 @@ import java.util.List; @Data public class DataAsynParam { + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern="yyyy-MM-dd HH:mm:ss") + private LocalDateTime startDateTime; + + //截止时间 + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern="yyyy-MM-dd HH:mm:ss") + private LocalDateTime endDateTime; + //起始时间 @JsonFormat(shape = JsonFormat.Shape.STRING, pattern="yyyy-MM-dd") private LocalDate startTime; diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/JobDetailHours.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/JobDetailHours.java new file mode 100644 index 0000000..43af368 --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/JobDetailHours.java @@ -0,0 +1,75 @@ +package com.njcn.oracle.bo.po; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; + +import com.github.jeffreyning.mybatisplus.anno.MppMultiId; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * + * Description: + * Date: 2024/1/23 14:14【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +@TableName(value = "JOB_DETAIL_HOURS") +@NoArgsConstructor +@AllArgsConstructor +public class JobDetailHours { + /** + * 指标表名 + */ + @MppMultiId(value = "TABLE_NAME") + private String tableName; + + /** + * 执行日期 + */ + @MppMultiId(value = "EXCUTE_DATE") + private LocalDateTime excuteDate; + + /** + * 记录数 + */ + @TableField(value = "\"ROW_COUNT\"") + private Integer rowCount; + + /** + * 状态(0-执行中、1-成功、2-失败) + */ + @TableField(value = "\"STATE\"") + private Integer state; + + @TableField(value = "UPDATE_TIME") + private LocalDateTime updateTime; + + /** + * 消耗时长 + */ + @TableField(value = "DURATION") + private Double duration; + + public JobDetailHours(String tableName, LocalDateTime excuteDate, Integer state, Integer rowCount, LocalDateTime updateTime) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.state = state; + this.rowCount = rowCount; + this.updateTime = updateTime; + } + + public JobDetailHours(String tableName, LocalDateTime excuteDate, Integer rowCount) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.rowCount = rowCount; + } +} \ No newline at end of file diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/mapping/JobDetailHoursMapper.xml b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/mapping/JobDetailHoursMapper.xml new file mode 100644 index 0000000..be5c7d5 --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/mapping/JobDetailHoursMapper.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + "TABLE_NAME", EXCUTE_DATE, "ROW_COUNT", "STATE", UPDATE_TIME, DURATION + + \ No newline at end of file diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mybatis/mapper/JobDetailHoursMapper.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mybatis/mapper/JobDetailHoursMapper.java new file mode 100644 index 0000000..acd6e63 --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mybatis/mapper/JobDetailHoursMapper.java @@ -0,0 +1,16 @@ +package com.njcn.oracle.mybatis.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.github.jeffreyning.mybatisplus.base.MppBaseMapper; +import com.njcn.oracle.bo.po.JobDetailHours; + +/** + * + * Description: + * Date: 2024/1/23 14:14【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface JobDetailHoursMapper extends MppBaseMapper { +} \ No newline at end of file diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/JobDetailHoursService.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/JobDetailHoursService.java new file mode 100644 index 0000000..5e5b51e --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/JobDetailHoursService.java @@ -0,0 +1,17 @@ +package com.njcn.oracle.service; + +import com.github.jeffreyning.mybatisplus.service.IMppService; +import com.njcn.oracle.bo.po.JobDetailHours; +import com.baomidou.mybatisplus.extension.service.IService; + /** + * + * Description: + * Date: 2024/1/23 14:14【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface JobDetailHoursService extends IMppService { + + +} diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/JobDetailHoursServiceImpl.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/JobDetailHoursServiceImpl.java new file mode 100644 index 0000000..71d7a2a --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/JobDetailHoursServiceImpl.java @@ -0,0 +1,22 @@ +package com.njcn.oracle.service.impl; + +import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; +import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.util.List; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.oracle.mybatis.mapper.JobDetailHoursMapper; +import com.njcn.oracle.bo.po.JobDetailHours; +import com.njcn.oracle.service.JobDetailHoursService; +/** + * + * Description: + * Date: 2024/1/23 14:14【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +public class JobDetailHoursServiceImpl extends MppServiceImpl implements JobDetailHoursService{ + +} diff --git a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/controller/DataSyncController.java b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/controller/DataSyncController.java index 4f89ea0..7297b02 100644 --- a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/controller/DataSyncController.java +++ b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/controller/DataSyncController.java @@ -44,5 +44,15 @@ public class DataSyncController { return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步"); } + @PostMapping("/dataSyncHours") + @ApiOperation("数据同步") + @ApiImplicitParam(name = "dataAsynParam", value = "数据同步参数", required = true) + public Boolean dataSyncHours(@RequestBody DataAsynParam dataAsynParam) { + + oracleService.dataSyncHours(dataAsynParam); + + return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步"); + } + } diff --git a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/job/OracleToOralceJob.java b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/job/OracleToOralceJob.java index 8d7cead..369061c 100644 --- a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/job/OracleToOralceJob.java +++ b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/job/OracleToOralceJob.java @@ -1,76 +1,76 @@ -package com.njcn.oracle.job; - -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.njcn.oracle.bo.param.DataAsynParam; -import com.njcn.oracle.bo.param.ServiceTypeEnum; -import com.njcn.oracle.bo.po.JobHistoryLog; -import com.njcn.oracle.service.IOracleService; -import com.njcn.oracle.service.JobHistoryLogService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -/** - * Description: - * Date: 2024/1/18 10:08【需求编号】 - * - * @author clam - * @version V1.0.0 - */ - -@Component -@EnableScheduling -@RequiredArgsConstructor -@Slf4j -public class OracleToOralceJob { - private final IOracleService oracleService; - - private final JobHistoryLogService jobHistoryLogService; - - @Value("${job.slice:3}") - private int slice; - @Value("${job.startime}") - private String startime; - @Value("${job.endtime}") - private String endtime; - - @Scheduled(cron="0 5 0 * * ?") - public void execute() { - QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.select("min(LAST_DATE) as LAST_DATE"); - LocalDate startDate ; - LocalDate endDate ; - JobHistoryLog one = jobHistoryLogService.getBaseMapper().selectOne(queryWrapper); - //获取当前任务的结束时间 - if (Objects.isNull(one)){ - startDate =LocalDate.parse(startime); - - }else { - startDate = one.getLastDate(); - startDate = startDate.plusDays(-1); - } - //获取配置的endtime和执行到时时间减去执行天数当startday最大值当当前任务的开始时间 - List dates = Arrays.asList(LocalDate.parse(endtime), startDate.plusDays(-(slice-1))); - - endDate = dates.stream().max(LocalDate::compareTo).get(); - - - DataAsynParam dataAsynParam = new DataAsynParam(); - dataAsynParam.setStartTime(endDate); - dataAsynParam.setEndTime(startDate); - dataAsynParam.setTableNames(ServiceTypeEnum.getExecutableTypes()); - dataAsynParam.setExcuteType(2); - oracleService.dataBacthSysc(dataAsynParam); - } - -} +//package com.njcn.oracle.job; +// +//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +//import com.njcn.oracle.bo.param.DataAsynParam; +//import com.njcn.oracle.bo.param.ServiceTypeEnum; +//import com.njcn.oracle.bo.po.JobHistoryLog; +//import com.njcn.oracle.service.IOracleService; +//import com.njcn.oracle.service.JobHistoryLogService; +//import lombok.RequiredArgsConstructor; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.scheduling.annotation.EnableScheduling; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import java.time.LocalDate; +//import java.time.LocalDateTime; +//import java.util.Arrays; +//import java.util.List; +//import java.util.Objects; +//import java.util.Optional; +// +///** +// * Description: +// * Date: 2024/1/18 10:08【需求编号】 +// * +// * @author clam +// * @version V1.0.0 +// */ +// +//@Component +//@EnableScheduling +//@RequiredArgsConstructor +//@Slf4j +//public class OracleToOralceJob { +// private final IOracleService oracleService; +// +// private final JobHistoryLogService jobHistoryLogService; +// +// @Value("${job.slice:3}") +// private int slice; +// @Value("${job.startime}") +// private String startime; +// @Value("${job.endtime}") +// private String endtime; +// +// @Scheduled(cron="0 5 0 * * ?") +// public void execute() { +// QueryWrapper queryWrapper = new QueryWrapper<>(); +// queryWrapper.select("min(LAST_DATE) as LAST_DATE"); +// LocalDate startDate ; +// LocalDate endDate ; +// JobHistoryLog one = jobHistoryLogService.getBaseMapper().selectOne(queryWrapper); +// //获取当前任务的结束时间 +// if (Objects.isNull(one)){ +// startDate =LocalDate.parse(startime); +// +// }else { +// startDate = one.getLastDate(); +// startDate = startDate.plusDays(-1); +// } +// //获取配置的endtime和执行到时时间减去执行天数当startday最大值当当前任务的开始时间 +// List dates = Arrays.asList(LocalDate.parse(endtime), startDate.plusDays(-(slice-1))); +// +// endDate = dates.stream().max(LocalDate::compareTo).get(); +// +// +// DataAsynParam dataAsynParam = new DataAsynParam(); +// dataAsynParam.setStartTime(endDate); +// dataAsynParam.setEndTime(startDate); +// dataAsynParam.setTableNames(ServiceTypeEnum.getExecutableTypes()); +// dataAsynParam.setExcuteType(2); +// oracleService.dataBacthSysc(dataAsynParam); +// } +// +//} diff --git a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/IOracleService.java b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/IOracleService.java index f2ff150..1cc6fb2 100644 --- a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/IOracleService.java +++ b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/IOracleService.java @@ -4,4 +4,6 @@ import com.njcn.oracle.bo.param.DataAsynParam; public interface IOracleService { void dataBacthSysc(DataAsynParam dataAsynParam); + + void dataSyncHours(DataAsynParam dataAsynParam); } diff --git a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/impl/OracleServiceImpl.java b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/impl/OracleServiceImpl.java index 0f25dd1..cb1424a 100644 --- a/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/impl/OracleServiceImpl.java +++ b/oracle-data/oracle-target/src/main/java/com/njcn/oracle/service/impl/OracleServiceImpl.java @@ -9,14 +9,18 @@ import com.njcn.oracle.bo.param.DataAsynParam; import com.njcn.oracle.bo.param.MigrationParam; import com.njcn.oracle.bo.param.ServiceTypeEnum; import com.njcn.oracle.bo.po.JobDetail; +import com.njcn.oracle.bo.po.JobDetailHours; import com.njcn.oracle.bo.po.JobHistoryLog; import com.njcn.oracle.mybatis.service.IReplenishMybatisService; import com.njcn.oracle.service.IOracleService; +import com.njcn.oracle.service.JobDetailHoursService; import com.njcn.oracle.service.JobDetailService; import com.njcn.oracle.service.JobHistoryLogService; import com.njcn.oracle.util.LocalDateUtil; import lombok.Data; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -31,6 +35,7 @@ import java.util.Objects; @Data @Service @RequiredArgsConstructor +@Slf4j public class OracleServiceImpl implements IOracleService { private final static String PACKAGE_PREFIX = "com.njcn.oracle.service.impl."; @@ -39,6 +44,10 @@ public class OracleServiceImpl implements IOracleService { private final JobDetailService jobDetailService; private final JobHistoryLogService jobHistoryLogService; + private final List services; + + private final JobDetailHoursService jobDetailHoursService; + @Value("${business.slice:2}") private int slice; @@ -145,4 +154,84 @@ public class OracleServiceImpl implements IOracleService { }); } + + @Override + @Async + public void dataSyncHours(DataAsynParam dataAsynParam) { + Runtime runtime = Runtime.getRuntime(); + String tablename; + System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + + LocalDateTime startDateTime = dataAsynParam.getStartDateTime(); + LocalDateTime endDateTime = dataAsynParam.getEndDateTime(); + while (endDateTime.isBefore(startDateTime)) { + startDateTime = startDateTime.minusHours(1); + log.info("执行"+startDateTime+"时刻数据"); + for (IReplenishMybatisService service : services) { + tablename = service.getClass().getName().split(PACKAGE_PREFIX+"|"+PACKAGE_SUFFIX)[1]; + + //日志记录 + JobDetailHours jobDetailHours = jobDetailHoursService.lambdaQuery().eq(JobDetailHours::getTableName, tablename) + .eq(JobDetailHours::getExcuteDate, startDateTime) + .one(); + if (Objects.nonNull(jobDetailHours) && (jobDetailHours.getState() == 1 || jobDetailHours.getState() == 0|| jobDetailHours.getState() == 2)) { + //如果该指标当前时间段已执行或正在执行,直接跳出循环 + break; + } + if (Objects.isNull(jobDetailHours)) { + jobDetailHours = new JobDetailHours(tablename, startDateTime, 0, 0, LocalDateTime.now()); + jobDetailHoursService.save(jobDetailHours); + } + //程序监听 + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + MigrationParam migration = new MigrationParam(); + migration.setStartTime(startDateTime); + //between是闭区间,所以这里加一小时减1秒,正好执行一个小时数据 + migration.setEndTime(startDateTime.minusHours(-1).minusSeconds(1)); + //采用弱引用接受,后续手动调用gc后,会清空该对象 + WeakReference weakReferenceData = new WeakReference<>(service.queryData(migration)); + + int size = 0; + if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { + size = weakReferenceData.get().size(); + } + System.out.println(""+ "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println("" + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println("" + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + try { + if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { + //执行目标库的数据处理 + service.clearTargetData(migration); + service.insertBatchByDB(weakReferenceData.get()); + //最后一片时修改状态 + } + //手动执行GC + System.gc(); + stopWatch.stop(); + jobDetailHours.setRowCount(size); + jobDetailHours.setState(1); + jobDetailHours.setDuration(stopWatch.getTotalTimeSeconds()); + jobDetailHoursService.updateByMultiId(jobDetailHours); + + } catch (Exception exception) { + exception.printStackTrace(); + jobDetailHours.setState(2); + jobDetailHours.setUpdateTime(LocalDateTime.now()); + jobDetailHoursService.updateByMultiId(jobDetailHours); + } + + + System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + } + + + + } + } }