添加数据同步代码

This commit is contained in:
hzj
2024-01-10 10:40:02 +08:00
parent 4702c266a4
commit ff80efa279
8 changed files with 94 additions and 51 deletions

View File

@@ -57,9 +57,12 @@ public enum ServiceTypeEnum {
* @return
*/
public static List<String> getExecutableTypes() {
return Arrays.stream(ServiceTypeEnum.values()).map(ServiceTypeEnum::name).collect(Collectors.toList());
return Arrays.stream(ServiceTypeEnum.values()).map(tmep->{
return tmep.code;
}).collect(Collectors.toList());
}
/**
* 通过code获取枚举值
* @param code

View File

@@ -9,7 +9,9 @@ import com.github.jeffreyning.mybatisplus.anno.MppMultiId;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Objects;
/**
*
@@ -34,22 +36,14 @@ public class JobDetail {
* 起始时间
*/
@JsonFormat(
pattern = "yyyy-MM-dd HH:mm:ss"
pattern = "yyyy-MM-dd"
)
@MppMultiId(value = "START_TIME")
private LocalDateTime startTime;
@MppMultiId(value = "EXCUTE_DATE")
private LocalDate excuteDate;
/**
* 截止时间
*/
@JsonFormat(
pattern = "yyyy-MM-dd HH:mm:ss"
)
@MppMultiId(value = "END_TIME")
private LocalDateTime endTime;
/**
* 状态0-执行中、1-成功、2-失败)
* 状态0-执行中、1-成功、2-失败,3未执行
*/
@TableField(value = "STATE")
private Integer state;
@@ -65,5 +59,17 @@ public class JobDetail {
@TableField(value = "UPDATE_TIME")
private LocalDateTime updateTime;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JobDetail jobDetail = (JobDetail) o;
return Objects.equals(tableName, jobDetail.tableName) && Objects.equals(excuteDate, jobDetail.excuteDate);
}
}

View File

@@ -5,8 +5,7 @@
<!--@mbg.generated-->
<!--@Table JOB_DETAIL-->
<id column="TABLE_NAME" jdbcType="VARCHAR" property="tableName" />
<id column="START_TIME" jdbcType="TIMESTAMP" property="startTime" />
<id column="END_TIME" jdbcType="TIMESTAMP" property="endTime" />
<id column="EXCUTE_DATE" jdbcType="TIMESTAMP" property="excuteDate" />
<id column="STATE" jdbcType="DECIMAL" property="state" />
<result column="ROW_COUNT" jdbcType="DECIMAL" property="rowCount" />
<result column="UPDATE_TIME" jdbcType="TIMESTAMP" property="updateTime" />
@@ -19,7 +18,7 @@
<select id="query" resultMap="BaseResultMap">
select * from ( select row_.*, rownum rownum_ from (
select * from JOB_DETAIL a where 1=1
and a.START_TIME between #{jobQueryParam.startTime,jdbcType=DATE} and #{jobQueryParam.endTime,jdbcType=DATE}
and a.EXCUTE_DATE between #{jobQueryParam.startTime,jdbcType=DATE} and #{jobQueryParam.endTime,jdbcType=DATE}
<if test="jobQueryParam.states!=null and jobQueryParam.states.size()!=0">
AND a.STATE IN
<foreach collection="jobQueryParam.states" item="item" open="(" close=")" separator=",">
@@ -32,7 +31,7 @@
#{item}
</foreach>
</if>
order by a.START_TIME desc
order by a.EXCUTE_DATE desc
) row_
where rownum &lt;= #{jobQueryParam.currentPage} * #{jobQueryParam.pageSize} ) where rownum_ &gt; (#{jobQueryParam.currentPage} - 1) * #{jobQueryParam.pageSize}
</select>

View File

@@ -1,8 +1,9 @@
package com.njcn.oracle.service;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
import java.time.LocalDate;
/**
* Description:
* Date: 2024/1/8 16:16【需求编号】
@@ -11,5 +12,5 @@ import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
* @version V1.0.0
*/
public interface DataSyncService {
void dataSync(IReplenishMybatisService executor, String temp, MigrationParam migration);
void dataSync(IReplenishMybatisService executor, String temp, LocalDate migration);
}

View File

@@ -1,6 +1,8 @@
package com.njcn.oracle.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.bo.po.JobDetail;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
@@ -10,8 +12,8 @@ import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Objects;
@@ -29,16 +31,21 @@ public class DataSyncServiceImpl implements DataSyncService {
@Override
@Async
public void dataSync(IReplenishMybatisService executor, String temp, MigrationParam migration) {
public void dataSync(IReplenishMybatisService executor, String temp, LocalDate date) {
LocalDateTime localDateTime = date.atStartOfDay();
LocalDateTime tempStartTime = LocalDateTimeUtil.beginOfDay(localDateTime);
LocalDateTime tempEndTime = LocalDateTimeUtil.endOfDay(localDateTime);
MigrationParam migration = new MigrationParam();
migration.setStartTime(tempStartTime);
migration.setEndTime(tempEndTime);
List list = executor.queryData(migration);
JobDetail jobDetail = new JobDetail();
jobDetail.setTableName(temp);
jobDetail.setStartTime(migration.getStartTime());
jobDetail.setEndTime(migration.getEndTime());
jobDetail.setExcuteDate(date);
JobDetail select = jobDetailService.select(jobDetail);
if (Objects.nonNull(select)){

View File

@@ -7,10 +7,16 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
import com.njcn.oracle.bo.param.JobQueryParam;
import com.njcn.oracle.bo.param.ServiceTypeEnum;
import com.njcn.oracle.util.LocalDateUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.oracle.mybatis.mapper.JobDetailMapper;
import com.njcn.oracle.bo.po.JobDetail;
@@ -33,8 +39,7 @@ public class JobDetailServiceImpl extends MppServiceImpl<JobDetailMapper, JobDet
@Override
public JobDetail select(JobDetail jobDetail) {
JobDetail jobDetail1 = this.lambdaQuery().eq(JobDetail::getTableName,jobDetail.getTableName()).
eq(JobDetail::getStartTime,jobDetail.getStartTime()).
eq(JobDetail::getEndTime,jobDetail.getEndTime()).one();
eq(JobDetail::getExcuteDate,jobDetail.getExcuteDate()).one();
return jobDetail1;
}
@@ -42,16 +47,41 @@ public class JobDetailServiceImpl extends MppServiceImpl<JobDetailMapper, JobDet
public IPage<JobDetail> selectByParam(JobQueryParam jobQueryParam) {
IPage<JobDetail> page = new Page<>(jobQueryParam.getCurrentPage(), jobQueryParam.getPageSize());
QueryWrapper<JobDetail> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().between(JobDetail::getStartTime, jobQueryParam.getStartTime(), jobQueryParam.getEndTime()).
queryWrapper.lambda().between(JobDetail::getExcuteDate, jobQueryParam.getStartTime(), jobQueryParam.getEndTime()).
in(!CollectionUtils.isEmpty(jobQueryParam.getStates()), JobDetail::getState, jobQueryParam.getStates()).
in(!CollectionUtils.isEmpty(jobQueryParam.getTableNames()), JobDetail::getTableName, jobQueryParam.getTableNames()).orderByDesc(JobDetail::getStartTime);
in(!CollectionUtils.isEmpty(jobQueryParam.getTableNames()), JobDetail::getTableName, jobQueryParam.getTableNames());
Integer integer = this.getBaseMapper().selectCount(queryWrapper);
List<JobDetail> jobDetails = this.getBaseMapper().selectList(queryWrapper);
List<String> tableNames = new ArrayList<>();
if(CollectionUtils.isEmpty(jobQueryParam.getTableNames())){
tableNames = ServiceTypeEnum.getExecutableTypes();
}else {
tableNames = jobQueryParam.getTableNames();
}
List<LocalDate> dateList = LocalDateUtil.getDateList(jobQueryParam.getStartTime(), jobQueryParam.getEndTime());
List<String> finalTableNames = tableNames;
dateList.stream().forEach(localDate -> {
finalTableNames.stream().forEach(temp->{
JobDetail jobDetail = new JobDetail();
jobDetail.setTableName(temp);
jobDetail.setExcuteDate(localDate);
if (jobDetails.contains(jobDetail)){
return;
}
jobDetail.setTableName_CN(ServiceTypeEnum.getValueByCode(temp));
jobDetail.setState(3);
jobDetails.add(jobDetail);
});
});
List<JobDetail> jobDetailIPage = this.getBaseMapper().query(jobQueryParam);
List<JobDetail> jobDetailIPage= jobDetails.stream().sorted(Comparator.comparing(JobDetail::getExcuteDate).reversed()).
skip((jobQueryParam.getCurrentPage() - 1) * jobQueryParam.getPageSize()).
limit(jobQueryParam.getPageSize()).collect(Collectors.toList());
jobDetailIPage.stream().forEach(temp->temp.setTableName_CN(ServiceTypeEnum.getValueByCode(temp.getTableName())));
page.setRecords(jobDetailIPage);
page.setTotal(Long.parseLong(integer+""));
page.setTotal(Long.parseLong(jobDetails.size()+""));
return page;
}
@@ -59,8 +89,7 @@ public class JobDetailServiceImpl extends MppServiceImpl<JobDetailMapper, JobDet
public boolean jobRemove(JobDetail jobDetail) {
QueryWrapper<JobDetail> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(JobDetail::getTableName,jobDetail.getTableName()).
eq(JobDetail::getStartTime,jobDetail.getStartTime()).
eq(JobDetail::getEndTime,jobDetail.getEndTime());
eq(JobDetail::getExcuteDate,jobDetail.getExcuteDate());
boolean remove = this.remove(queryWrapper);
return remove;
}