oralce同步到influxDB

This commit is contained in:
hzj
2024-01-16 15:51:00 +08:00
parent a2afabe80c
commit aa23ed6199
22 changed files with 75111 additions and 339 deletions

View File

@@ -15,6 +15,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
* @date 2022/11/10
*/
@Slf4j
@EnableAsync
@MapperScan("com.njcn.**.mapper")
@SpringBootApplication(scanBasePackages = "com.njcn",exclude = {SecurityAutoConfiguration.class, SecurityFilterAutoConfiguration.class})
public class InfluxDataApplication {

View File

@@ -0,0 +1,51 @@
package com.njcn.influx.controller;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.njcn.influx.bo.po.JobDetailInfluxDB;
import com.njcn.influx.service.JobDetailInfluxDBService;
import com.njcn.oracle.bo.param.JobQueryParam;
import com.njcn.oracle.bo.po.JobDetail;
import com.njcn.oracle.service.JobDetailService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Description:
* Date: 2024/1/9 9:13【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Validated
@Slf4j
@RestController
@RequestMapping("/jobDetail")
@Api(tags = "同步任务")
@AllArgsConstructor
public class JobDetailInfluxDBController {
private final JobDetailInfluxDBService jobDetailInfluxDBService;
@PostMapping("/jobQuery")
@ApiOperation("任务查询")
@ApiImplicitParam(name = "jobQueryParam", value = "任务查询参数", required = true)
public IPage<JobDetailInfluxDB> jobQuery(@RequestBody JobQueryParam jobQueryParam){
IPage<JobDetailInfluxDB> jobDetails = jobDetailInfluxDBService.selectByParam(jobQueryParam);
return jobDetails;//HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, jobDetails, "任务查询");
}
@PostMapping("/jobRemove")
@ApiOperation("任务移除")
@ApiImplicitParam(name = "jobQueryParam", value = "任务移除参数", required = true)
public boolean jobRemove(@RequestBody JobDetailInfluxDB jobDetail){
boolean b = jobDetailInfluxDBService.jobRemove(jobDetail);
return b;
}
}

View File

@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.njcn.influx.service.InfluxDBBaseService;
import com.njcn.influx.service.OracleToInfluxDBService;
import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
@@ -42,69 +43,14 @@ import java.util.stream.Stream;
@AllArgsConstructor
public class OracleToInfluxDBController {
private final InfluxDBBaseService influxDBBaseService;
private final OracleToInfluxDBService oracleToInfluxDBService;
@PostMapping("/dataSync")
@ApiOperation("数据同步")
@ApiImplicitParam(name = "dataAsynParam", value = "数据同步参数", required = true)
@SneakyThrows
public Boolean dataSync(@RequestBody DataAsynParam dataAsynParam){
dataAsynParam.getTableNames().stream().forEach(temp->{
IReplenishMybatisService executor = null;
try {
executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName("com.njcn.oracle.service.impl." + temp + "ServiceImpl"));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
IReplenishMybatisService finalExecutor = executor;
MigrationParam migration = new MigrationParam();
LocalDateTime localDateTime = dataAsynParam.getStartTime().atStartOfDay();
LocalDateTime tempStartTime = LocalDateTimeUtil.beginOfDay(localDateTime);
LocalDateTime tempEndTime = LocalDateTimeUtil.endOfDay(localDateTime);
migration.setStartTime(tempStartTime);
migration.setEndTime(tempEndTime);
migration.setStartTime(tempStartTime);
migration.setEndTime(tempEndTime);
List list = finalExecutor.queryData(migration);
if (CollectionUtil.isEmpty(list)) {
return;
}
Class<?> clazz = null;
Class<?> clazz2 = null;
try {
clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + temp);
clazz2 = Class.forName("com.njcn.oracle.bo.po." + temp);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Method method = null;
try {
method = clazz.getDeclaredMethod("oralceToInfluxDB",clazz2);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
method.setAccessible(true);
Method finalMethod = method;
List list1 =(List) list.stream().flatMap(po -> {
try {
Object invoke = finalMethod.invoke(null,po);
Object invoke1 = invoke;
//返回可能是集合
return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).map(item-> (Object) item).collect(Collectors.toList());
influxDBBaseService.insertBatchBySlice(list1,5000);
});
oracleToInfluxDBService.dataBacthSysc(dataAsynParam);
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
}