oracle同步到influxdb添加每小时执行批处理
This commit is contained in:
@@ -0,0 +1,71 @@
|
|||||||
|
package com.njcn.influx.bo.po;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableField;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import com.github.jeffreyning.mybatisplus.anno.MppMultiId;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/1/23 14:14【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@TableName(value = "JOB_DETAIL_HOURS_INFLUXDB")
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class JobDetailHoursInfluxDB {
|
||||||
|
/**
|
||||||
|
* 指标表名
|
||||||
|
*/
|
||||||
|
@MppMultiId(value = "TABLE_NAME")
|
||||||
|
private String tableName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行日期
|
||||||
|
*/
|
||||||
|
@MppMultiId(value = "EXCUTE_DATE")
|
||||||
|
private LocalDateTime excuteDate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 记录数
|
||||||
|
*/
|
||||||
|
@TableField(value = "\"ROW_COUNT\"")
|
||||||
|
private Integer rowCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 状态(0-执行中、1-成功、2-失败)
|
||||||
|
*/
|
||||||
|
@TableField(value = "\"STATE\"")
|
||||||
|
private Integer state;
|
||||||
|
|
||||||
|
@TableField(value = "UPDATE_TIME")
|
||||||
|
private LocalDateTime updateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消耗时长
|
||||||
|
*/
|
||||||
|
@TableField(value = "DURATION")
|
||||||
|
private Double duration;
|
||||||
|
|
||||||
|
public JobDetailHoursInfluxDB(String tableName, LocalDateTime excuteDate, Integer state, Integer rowCount, LocalDateTime updateTime) {
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.excuteDate = excuteDate;
|
||||||
|
this.state = state;
|
||||||
|
this.rowCount = rowCount;
|
||||||
|
this.updateTime = updateTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobDetailHoursInfluxDB(String tableName, LocalDateTime excuteDate, Integer rowCount) {
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.excuteDate = excuteDate;
|
||||||
|
this.rowCount = rowCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package com.njcn.influx.mapper;
|
||||||
|
|
||||||
|
import com.github.jeffreyning.mybatisplus.base.MppBaseMapper;
|
||||||
|
import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/1/23 14:14【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
public interface JobDetailHoursInfluxDBMapper extends MppBaseMapper<JobDetailHoursInfluxDB> {
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package com.njcn.influx.service;
|
||||||
|
|
||||||
|
import com.github.jeffreyning.mybatisplus.service.IMppService;
|
||||||
|
import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
|
||||||
|
import com.njcn.oracle.bo.po.JobDetailHours;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/1/23 14:14【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
public interface JobDetailHoursInfluxDBService extends IMppService<JobDetailHoursInfluxDB> {
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -4,4 +4,6 @@ import com.njcn.oracle.bo.param.DataAsynParam;
|
|||||||
|
|
||||||
public interface OracleToInfluxDBService {
|
public interface OracleToInfluxDBService {
|
||||||
void dataBacthSysc(DataAsynParam dataAsynParam);
|
void dataBacthSysc(DataAsynParam dataAsynParam);
|
||||||
|
|
||||||
|
void hourseDataBacthSysc(DataAsynParam dataAsynParam);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package com.njcn.influx.service.impl;
|
||||||
|
|
||||||
|
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
||||||
|
import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
|
||||||
|
import com.njcn.influx.mapper.JobDetailHoursInfluxDBMapper;
|
||||||
|
import com.njcn.influx.service.JobDetailHoursInfluxDBService;
|
||||||
|
import com.njcn.oracle.bo.po.JobDetailHours;
|
||||||
|
import com.njcn.oracle.mybatis.mapper.JobDetailHoursMapper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/2/19 15:22【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class JobDetailHoursInfluxDBServiceImpl extends MppServiceImpl<JobDetailHoursInfluxDBMapper, JobDetailHoursInfluxDB> implements JobDetailHoursInfluxDBService {
|
||||||
|
}
|
||||||
@@ -6,17 +6,21 @@ import cn.hutool.core.date.LocalDateTimeUtil;
|
|||||||
import cn.hutool.core.text.StrPool;
|
import cn.hutool.core.text.StrPool;
|
||||||
import cn.hutool.extra.spring.SpringUtil;
|
import cn.hutool.extra.spring.SpringUtil;
|
||||||
import com.njcn.influx.bo.param.TableEnum;
|
import com.njcn.influx.bo.param.TableEnum;
|
||||||
|
import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
|
||||||
import com.njcn.influx.bo.po.JobDetailInfluxDB;
|
import com.njcn.influx.bo.po.JobDetailInfluxDB;
|
||||||
import com.njcn.influx.bo.po.JobHistoryLogInfluxdb;
|
import com.njcn.influx.bo.po.JobHistoryLogInfluxdb;
|
||||||
import com.njcn.influx.config.IdMappingCache;
|
import com.njcn.influx.config.IdMappingCache;
|
||||||
|
import com.njcn.influx.service.JobDetailHoursInfluxDBService;
|
||||||
import com.njcn.influx.service.JobDetailInfluxDBService;
|
import com.njcn.influx.service.JobDetailInfluxDBService;
|
||||||
import com.njcn.influx.service.JobHistoryLogInfluxdbService;
|
import com.njcn.influx.service.JobHistoryLogInfluxdbService;
|
||||||
import com.njcn.influx.service.OracleToInfluxDBService;
|
import com.njcn.influx.service.OracleToInfluxDBService;
|
||||||
import com.njcn.oracle.bo.param.DataAsynParam;
|
import com.njcn.oracle.bo.param.DataAsynParam;
|
||||||
import com.njcn.oracle.bo.param.MigrationParam;
|
import com.njcn.oracle.bo.param.MigrationParam;
|
||||||
import com.njcn.oracle.bo.param.ServiceTypeEnum;
|
import com.njcn.oracle.bo.param.ServiceTypeEnum;
|
||||||
|
import com.njcn.oracle.bo.po.JobDetailHours;
|
||||||
import com.njcn.oracle.bo.po.JobHistoryLog;
|
import com.njcn.oracle.bo.po.JobHistoryLog;
|
||||||
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
|
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
|
||||||
|
import com.njcn.oracle.service.JobDetailHoursService;
|
||||||
import com.njcn.oracle.util.LocalDateUtil;
|
import com.njcn.oracle.util.LocalDateUtil;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -62,6 +66,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
|||||||
|
|
||||||
private final JobHistoryLogInfluxdbService jobHistoryLogInfluxdbService;
|
private final JobHistoryLogInfluxdbService jobHistoryLogInfluxdbService;
|
||||||
|
|
||||||
|
private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService;
|
||||||
|
|
||||||
@Value("${business.slice:2}")
|
@Value("${business.slice:2}")
|
||||||
private int slice;
|
private int slice;
|
||||||
|
|
||||||
@@ -207,4 +213,131 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
//按小时来同步数据
|
||||||
|
@Override
|
||||||
|
public void hourseDataBacthSysc(DataAsynParam dataAsynParam) {
|
||||||
|
Runtime runtime = Runtime.getRuntime();
|
||||||
|
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||||
|
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||||
|
System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||||
|
//目前且作为2片,后续将该属性提取到配置文件中
|
||||||
|
List<String> tableNames = dataAsynParam.getTableNames();
|
||||||
|
//嵌套循环,先循环指标,再循环日期
|
||||||
|
tableNames.stream().forEach(tableName -> {
|
||||||
|
IReplenishMybatisService executor;
|
||||||
|
try {
|
||||||
|
executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX));
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//日志记录
|
||||||
|
JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName)
|
||||||
|
.eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime())
|
||||||
|
.one();
|
||||||
|
if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) {
|
||||||
|
//如果该指标当前时间段已执行或正在执行,直接跳出循环
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (Objects.isNull(jobDetailInfluxDBHours)) {
|
||||||
|
jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now());
|
||||||
|
jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours);
|
||||||
|
}
|
||||||
|
//程序监听
|
||||||
|
StopWatch stopWatch = new StopWatch();
|
||||||
|
stopWatch.start();
|
||||||
|
LocalDateTime startTime = dataAsynParam.getStartDateTime();
|
||||||
|
LocalDateTime endTime = dataAsynParam.getStartDateTime();
|
||||||
|
//查询该时区的数据,并准备入库
|
||||||
|
MigrationParam migration = new MigrationParam();
|
||||||
|
migration.setStartTime(startTime);
|
||||||
|
migration.setEndTime(endTime);
|
||||||
|
List list = executor.queryData(migration);
|
||||||
|
//反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid
|
||||||
|
Iterator iterator = list.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
try{
|
||||||
|
Object obj = iterator.next();
|
||||||
|
//获取
|
||||||
|
Field id = obj.getClass().getDeclaredField("lineid");
|
||||||
|
id.setAccessible(true); //暴力访问id
|
||||||
|
String id1 = id.get(obj).toString();
|
||||||
|
if (!IdMappingCache.IdMapping.containsKey(id1)){
|
||||||
|
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
|
||||||
|
iterator.remove();
|
||||||
|
}else {
|
||||||
|
id.set(obj, IdMappingCache.IdMapping.get(id1));
|
||||||
|
}
|
||||||
|
}catch (Exception e){
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//采用弱引用接受,后续手动调用gc后,会清空该对象
|
||||||
|
WeakReference<List> weakReferenceData = new WeakReference<>(list);
|
||||||
|
int size = 0;
|
||||||
|
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
|
||||||
|
size = weakReferenceData.get().size();
|
||||||
|
}
|
||||||
|
System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||||
|
System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||||
|
System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||||
|
try {
|
||||||
|
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
|
||||||
|
//执行目标库的数据处理
|
||||||
|
Class<?> clazz;
|
||||||
|
Class<?> clazz2;
|
||||||
|
//获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
|
||||||
|
try {
|
||||||
|
clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName);
|
||||||
|
clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
Method method;
|
||||||
|
try {
|
||||||
|
method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
method.setAccessible(true);
|
||||||
|
Method finalMethod = method;
|
||||||
|
List list1 = (List) weakReferenceData.get().stream().flatMap(po -> {
|
||||||
|
try {
|
||||||
|
Object invoke = finalMethod.invoke(null, po);
|
||||||
|
Object invoke1 = invoke;
|
||||||
|
//返回oracle转influx,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化
|
||||||
|
return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
//插入influxdb
|
||||||
|
influxDBBaseService.insertBatchBySlice(tableName, list1, 10000);
|
||||||
|
// size = list1.size();
|
||||||
|
//最后一片时修改状态
|
||||||
|
}
|
||||||
|
//手动执行GC
|
||||||
|
System.gc();
|
||||||
|
stopWatch.stop();
|
||||||
|
jobDetailInfluxDBHours.setRowCount(size);
|
||||||
|
jobDetailInfluxDBHours.setState(1);
|
||||||
|
jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds());
|
||||||
|
jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
|
||||||
|
} catch (Exception exception) {
|
||||||
|
exception.printStackTrace();
|
||||||
|
jobDetailInfluxDBHours.setState(2);
|
||||||
|
jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now());
|
||||||
|
jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
|
||||||
|
}
|
||||||
|
System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||||
|
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||||
|
System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||||
|
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ import org.springframework.scheduling.annotation.Scheduled;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Description:
|
* Description:
|
||||||
@@ -36,4 +38,25 @@ public class OracleToInfluxDBJob {
|
|||||||
dataAsynParam.setExcuteType(2);
|
dataAsynParam.setExcuteType(2);
|
||||||
oracleToInfluxDBService.dataBacthSysc(dataAsynParam);
|
oracleToInfluxDBService.dataBacthSysc(dataAsynParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//每小时03分钟时执行上一个小时的数据同步
|
||||||
|
@Scheduled(cron="0 3 * * * ?")
|
||||||
|
public void executeHours() {
|
||||||
|
DataAsynParam dataAsynParam = new DataAsynParam();
|
||||||
|
// 获取当前时间
|
||||||
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
|
||||||
|
// 减去一个小时
|
||||||
|
LocalDateTime oneHourAgo = now.minusHours(1);
|
||||||
|
|
||||||
|
// 将分钟和秒设置为0
|
||||||
|
LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS);
|
||||||
|
// 加上59分钟59秒
|
||||||
|
LocalDateTime modifiedResult = result.plusMinutes(59).plusSeconds(59);
|
||||||
|
dataAsynParam.setStartDateTime(result);
|
||||||
|
dataAsynParam.setEndDateTime(modifiedResult);
|
||||||
|
dataAsynParam.setTableNames(TableEnum.getExecutableTypes());
|
||||||
|
dataAsynParam.setExcuteType(2);
|
||||||
|
oracleToInfluxDBService.hourseDataBacthSysc(dataAsynParam);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,14 +7,10 @@ server:
|
|||||||
spring:
|
spring:
|
||||||
#influxDB内容配置
|
#influxDB内容配置
|
||||||
influx:
|
influx:
|
||||||
url: http://25.36.232.36:8086
|
url: http://192.168.1.102:8086
|
||||||
user: admin
|
user: admin
|
||||||
password: admin
|
password: 123456
|
||||||
database: pqsbase_hbcs
|
database: pqsbase_sjzx
|
||||||
# url: http://192.168.1.81:18086
|
|
||||||
# user: admin
|
|
||||||
# password: 123456
|
|
||||||
# database: pqsbase
|
|
||||||
mapper-location: com.njcn.influx.imapper
|
mapper-location: com.njcn.influx.imapper
|
||||||
application:
|
application:
|
||||||
name: oracle-influx
|
name: oracle-influx
|
||||||
@@ -71,20 +67,14 @@ spring:
|
|||||||
strict: false
|
strict: false
|
||||||
datasource:
|
datasource:
|
||||||
master:
|
master:
|
||||||
url: jdbc:oracle:thin:@10.122.32.73:11521/dwxb
|
url: jdbc:oracle:thin:@192.168.1.101:1521:pqsbase
|
||||||
username: pqsadmin
|
username: pqsadmin
|
||||||
password: pqsadmin_123
|
password: Pqsadmin123
|
||||||
# url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase
|
|
||||||
# username: pqsadmin_hn
|
|
||||||
# password: pqsadmin
|
|
||||||
driver-class-name: oracle.jdbc.driver.OracleDriver
|
driver-class-name: oracle.jdbc.driver.OracleDriver
|
||||||
target:
|
target:
|
||||||
url: jdbc:mysql://25.36.232.37:13306/pmsinfo?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
|
url: jdbc:mysql://192.168.1.102:13306/pqsinfo?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
|
||||||
username: root
|
username: root
|
||||||
password: Huawei12#
|
password: njcnpqs
|
||||||
# url: jdbc:mysql://192.168.1.24:13306/pqsinfo_pq?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
|
|
||||||
# username: root
|
|
||||||
# password: njcnpqs
|
|
||||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||||
#mybatis配置信息
|
#mybatis配置信息
|
||||||
mybatis-plus:
|
mybatis-plus:
|
||||||
|
|||||||
Reference in New Issue
Block a user