修改oralcetooralcejob数据同步由现在往以前同步

This commit is contained in:
hzj
2024-01-23 15:50:47 +08:00
parent b66b63d28c
commit 8880c8d4b4
10 changed files with 332 additions and 76 deletions

View File

@@ -44,5 +44,15 @@ public class DataSyncController {
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
}
@PostMapping("/dataSyncHours")
@ApiOperation("数据同步")
@ApiImplicitParam(name = "dataAsynParam", value = "数据同步参数", required = true)
public Boolean dataSyncHours(@RequestBody DataAsynParam dataAsynParam) {
oracleService.dataSyncHours(dataAsynParam);
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
}
}

View File

@@ -1,76 +1,76 @@
package com.njcn.oracle.job;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.param.ServiceTypeEnum;
import com.njcn.oracle.bo.po.JobHistoryLog;
import com.njcn.oracle.service.IOracleService;
import com.njcn.oracle.service.JobHistoryLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Description:
* Date: 2024/1/18 10:08【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@EnableScheduling
@RequiredArgsConstructor
@Slf4j
public class OracleToOralceJob {
private final IOracleService oracleService;
private final JobHistoryLogService jobHistoryLogService;
@Value("${job.slice:3}")
private int slice;
@Value("${job.startime}")
private String startime;
@Value("${job.endtime}")
private String endtime;
@Scheduled(cron="0 5 0 * * ?")
public void execute() {
QueryWrapper<JobHistoryLog> queryWrapper = new QueryWrapper<>();
queryWrapper.select("min(LAST_DATE) as LAST_DATE");
LocalDate startDate ;
LocalDate endDate ;
JobHistoryLog one = jobHistoryLogService.getBaseMapper().selectOne(queryWrapper);
//获取当前任务的结束时间
if (Objects.isNull(one)){
startDate =LocalDate.parse(startime);
}else {
startDate = one.getLastDate();
startDate = startDate.plusDays(-1);
}
//获取配置的endtime和执行到时时间减去执行天数当startday最大值当当前任务的开始时间
List<LocalDate> dates = Arrays.asList(LocalDate.parse(endtime), startDate.plusDays(-(slice-1)));
endDate = dates.stream().max(LocalDate::compareTo).get();
DataAsynParam dataAsynParam = new DataAsynParam();
dataAsynParam.setStartTime(endDate);
dataAsynParam.setEndTime(startDate);
dataAsynParam.setTableNames(ServiceTypeEnum.getExecutableTypes());
dataAsynParam.setExcuteType(2);
oracleService.dataBacthSysc(dataAsynParam);
}
}
//package com.njcn.oracle.job;
//
//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
//import com.njcn.oracle.bo.param.DataAsynParam;
//import com.njcn.oracle.bo.param.ServiceTypeEnum;
//import com.njcn.oracle.bo.po.JobHistoryLog;
//import com.njcn.oracle.service.IOracleService;
//import com.njcn.oracle.service.JobHistoryLogService;
//import lombok.RequiredArgsConstructor;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.scheduling.annotation.EnableScheduling;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component;
//
//import java.time.LocalDate;
//import java.time.LocalDateTime;
//import java.util.Arrays;
//import java.util.List;
//import java.util.Objects;
//import java.util.Optional;
//
///**
// * Description:
// * Date: 2024/1/18 10:08【需求编号】
// *
// * @author clam
// * @version V1.0.0
// */
//
//@Component
//@EnableScheduling
//@RequiredArgsConstructor
//@Slf4j
//public class OracleToOralceJob {
// private final IOracleService oracleService;
//
// private final JobHistoryLogService jobHistoryLogService;
//
// @Value("${job.slice:3}")
// private int slice;
// @Value("${job.startime}")
// private String startime;
// @Value("${job.endtime}")
// private String endtime;
//
// @Scheduled(cron="0 5 0 * * ?")
// public void execute() {
// QueryWrapper<JobHistoryLog> queryWrapper = new QueryWrapper<>();
// queryWrapper.select("min(LAST_DATE) as LAST_DATE");
// LocalDate startDate ;
// LocalDate endDate ;
// JobHistoryLog one = jobHistoryLogService.getBaseMapper().selectOne(queryWrapper);
// //获取当前任务的结束时间
// if (Objects.isNull(one)){
// startDate =LocalDate.parse(startime);
//
// }else {
// startDate = one.getLastDate();
// startDate = startDate.plusDays(-1);
// }
// //获取配置的endtime和执行到时时间减去执行天数当startday最大值当当前任务的开始时间
// List<LocalDate> dates = Arrays.asList(LocalDate.parse(endtime), startDate.plusDays(-(slice-1)));
//
// endDate = dates.stream().max(LocalDate::compareTo).get();
//
//
// DataAsynParam dataAsynParam = new DataAsynParam();
// dataAsynParam.setStartTime(endDate);
// dataAsynParam.setEndTime(startDate);
// dataAsynParam.setTableNames(ServiceTypeEnum.getExecutableTypes());
// dataAsynParam.setExcuteType(2);
// oracleService.dataBacthSysc(dataAsynParam);
// }
//
//}

View File

@@ -4,4 +4,6 @@ import com.njcn.oracle.bo.param.DataAsynParam;
public interface IOracleService {
void dataBacthSysc(DataAsynParam dataAsynParam);
void dataSyncHours(DataAsynParam dataAsynParam);
}

View File

@@ -9,14 +9,18 @@ import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.bo.param.ServiceTypeEnum;
import com.njcn.oracle.bo.po.JobDetail;
import com.njcn.oracle.bo.po.JobDetailHours;
import com.njcn.oracle.bo.po.JobHistoryLog;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
import com.njcn.oracle.service.IOracleService;
import com.njcn.oracle.service.JobDetailHoursService;
import com.njcn.oracle.service.JobDetailService;
import com.njcn.oracle.service.JobHistoryLogService;
import com.njcn.oracle.util.LocalDateUtil;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@@ -31,6 +35,7 @@ import java.util.Objects;
@Data
@Service
@RequiredArgsConstructor
@Slf4j
public class OracleServiceImpl implements IOracleService {
private final static String PACKAGE_PREFIX = "com.njcn.oracle.service.impl.";
@@ -39,6 +44,10 @@ public class OracleServiceImpl implements IOracleService {
private final JobDetailService jobDetailService;
private final JobHistoryLogService jobHistoryLogService;
private final List<IReplenishMybatisService> services;
private final JobDetailHoursService jobDetailHoursService;
@Value("${business.slice:2}")
private int slice;
@@ -145,4 +154,84 @@ public class OracleServiceImpl implements IOracleService {
});
}
@Override
@Async
public void dataSyncHours(DataAsynParam dataAsynParam) {
Runtime runtime = Runtime.getRuntime();
String tablename;
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
LocalDateTime startDateTime = dataAsynParam.getStartDateTime();
LocalDateTime endDateTime = dataAsynParam.getEndDateTime();
while (endDateTime.isBefore(startDateTime)) {
startDateTime = startDateTime.minusHours(1);
log.info("执行"+startDateTime+"时刻数据");
for (IReplenishMybatisService service : services) {
tablename = service.getClass().getName().split(PACKAGE_PREFIX+"|"+PACKAGE_SUFFIX)[1];
//日志记录
JobDetailHours jobDetailHours = jobDetailHoursService.lambdaQuery().eq(JobDetailHours::getTableName, tablename)
.eq(JobDetailHours::getExcuteDate, startDateTime)
.one();
if (Objects.nonNull(jobDetailHours) && (jobDetailHours.getState() == 1 || jobDetailHours.getState() == 0|| jobDetailHours.getState() == 2)) {
//如果该指标当前时间段已执行或正在执行,直接跳出循环
break;
}
if (Objects.isNull(jobDetailHours)) {
jobDetailHours = new JobDetailHours(tablename, startDateTime, 0, 0, LocalDateTime.now());
jobDetailHoursService.save(jobDetailHours);
}
//程序监听
StopWatch stopWatch = new StopWatch();
stopWatch.start();
MigrationParam migration = new MigrationParam();
migration.setStartTime(startDateTime);
//between是闭区间所以这里加一小时减1秒正好执行一个小时数据
migration.setEndTime(startDateTime.minusHours(-1).minusSeconds(1));
//采用弱引用接受后续手动调用gc后会清空该对象
WeakReference<List> weakReferenceData = new WeakReference<>(service.queryData(migration));
int size = 0;
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
size = weakReferenceData.get().size();
}
System.out.println(""+ "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("" + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println("" + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
try {
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
//执行目标库的数据处理
service.clearTargetData(migration);
service.insertBatchByDB(weakReferenceData.get());
//最后一片时修改状态
}
//手动执行GC
System.gc();
stopWatch.stop();
jobDetailHours.setRowCount(size);
jobDetailHours.setState(1);
jobDetailHours.setDuration(stopWatch.getTotalTimeSeconds());
jobDetailHoursService.updateByMultiId(jobDetailHours);
} catch (Exception exception) {
exception.printStackTrace();
jobDetailHours.setState(2);
jobDetailHours.setUpdateTime(LocalDateTime.now());
jobDetailHoursService.updateByMultiId(jobDetailHours);
}
System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
}
}
}
}