oralce同步到influxDB

This commit is contained in:
hzj
2024-01-15 20:31:14 +08:00
parent 7b31e0376c
commit ef7f63bbec
42 changed files with 8096 additions and 1 deletions

View File

@@ -0,0 +1,26 @@
package com.njcn;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* pqs
*
* @author cdf
* @date 2022/11/10
*/
@Slf4j
@MapperScan("com.njcn.**.mapper")
@SpringBootApplication(scanBasePackages = "com.njcn",exclude = {SecurityAutoConfiguration.class, SecurityFilterAutoConfiguration.class})
public class InfluxDataApplication {
public static void main(String[] args) {
SpringApplication.run(InfluxDataApplication.class, args);
}
}

View File

@@ -0,0 +1,116 @@
package com.njcn.influx.controller;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.njcn.influx.service.InfluxDBBaseService;
import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
import com.njcn.oracle.service.DataSyncService;
import com.njcn.oracle.util.LocalDateUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.lang.reflect.Method;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Description:
* Date: 2024/1/15 18:47【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Validated
@Slf4j
@RestController
@RequestMapping("/data")
@Api(tags = "OracleToInfluxDB数据同步")
@AllArgsConstructor
public class OracleToInfluxDBController {
private final InfluxDBBaseService influxDBBaseService;
private final DataSyncService dataSyncService;
@PostMapping("/dataSync")
@ApiOperation("数据同步")
@ApiImplicitParam(name = "dataAsynParam", value = "数据同步参数", required = true)
@SneakyThrows
public Boolean dataSync(@RequestBody DataAsynParam dataAsynParam){
dataAsynParam.getTableNames().stream().forEach(temp->{
IReplenishMybatisService executor = null;
try {
executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName("com.njcn.oracle.service.impl." + temp + "ServiceImpl"));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
IReplenishMybatisService finalExecutor = executor;
MigrationParam migration = new MigrationParam();
LocalDateTime localDateTime = dataAsynParam.getStartTime().atStartOfDay();
LocalDateTime tempStartTime = LocalDateTimeUtil.beginOfDay(localDateTime);
LocalDateTime tempEndTime = LocalDateTimeUtil.endOfDay(localDateTime);
migration.setStartTime(tempStartTime);
migration.setEndTime(tempEndTime);
migration.setStartTime(tempStartTime);
migration.setEndTime(tempEndTime);
List list = finalExecutor.queryData(migration);
if (CollectionUtil.isEmpty(list)) {
return;
}
Class<?> clazz = null;
Class<?> clazz2 = null;
try {
clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + temp);
clazz2 = Class.forName("com.njcn.oracle.bo.po." + temp);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Method method = null;
try {
method = clazz.getDeclaredMethod("oralceToInfluxDB",clazz2);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
method.setAccessible(true);
Method finalMethod = method;
List list1 =(List) list.stream().flatMap(po -> {
try {
Object invoke = finalMethod.invoke(null,po);
Object invoke1 = invoke;
//返回可能是集合
return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).map(item-> (Object) item).collect(Collectors.toList());
influxDBBaseService.insertBatchBySlice(list1,5000);
});
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
}
}