河北转换提交
This commit is contained in:
@@ -356,4 +356,119 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
this.hourseDataBacthSysc(dataAsynParam1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void oneMonitorDataTransport(DataAsynParam dataAsynParam) {
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||
//目前且作为2片,后续将该属性提取到配置文件中
|
||||
List<String> tableNames = dataAsynParam.getTableNames();
|
||||
//嵌套循环,先循环指标,再循环日期
|
||||
tableNames.stream().forEach(tableName -> {
|
||||
IReplenishMybatisService executor;
|
||||
try {
|
||||
executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX));
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
List<LocalDate> dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime());
|
||||
for (LocalDate date : dateList) {
|
||||
|
||||
//分片下,每段时间的小时数
|
||||
int sliceHour = 24 / slice;
|
||||
for (int i = 0; i < slice; i++) {
|
||||
String dateStr = LocalDateTimeUtil.format(date, DatePattern.NORM_DATE_PATTERN);
|
||||
LocalDateTime startTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * i < 10 ? "0" + sliceHour * i : sliceHour * i) + ":00:00", DatePattern.NORM_DATETIME_PATTERN);
|
||||
LocalDateTime endTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * (i + 1) - 1 < 10 ? "0" + (sliceHour * (i + 1) - 1) : sliceHour * (i + 1) - 1) + ":59:59", DatePattern.NORM_DATETIME_PATTERN);
|
||||
//查询该时区的数据,并准备入库
|
||||
MigrationParam migration = new MigrationParam();
|
||||
migration.setStartTime(startTime);
|
||||
migration.setEndTime(endTime);
|
||||
if (Objects.nonNull(dataAsynParam.getMonitorId())) {
|
||||
migration.setLineIds(Stream.of(dataAsynParam.getMonitorId()).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
List list = executor.queryData(migration);
|
||||
//反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid
|
||||
Iterator iterator = list.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
try {
|
||||
Object obj = iterator.next();
|
||||
//获取
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!IdMappingCache.LineIdMapping.containsKey(id1)) {
|
||||
log.info(tableName + "表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid" + id1);
|
||||
iterator.remove();
|
||||
} else {
|
||||
id.set(obj, IdMappingCache.LineIdMapping.get(id1));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//采用弱引用接受,后续手动调用gc后,会清空该对象
|
||||
WeakReference<List> weakReferenceData = new WeakReference<>(list);
|
||||
int size = 0;
|
||||
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
|
||||
size = weakReferenceData.get().size();
|
||||
}
|
||||
System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||
try {
|
||||
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
|
||||
//执行目标库的数据处理
|
||||
Class<?> clazz;
|
||||
Class<?> clazz2;
|
||||
//获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
|
||||
try {
|
||||
clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName);
|
||||
clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
Method method;
|
||||
try {
|
||||
method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
method.setAccessible(true);
|
||||
Method finalMethod = method;
|
||||
List list1 = (List) weakReferenceData.get().stream().flatMap(po -> {
|
||||
try {
|
||||
Object invoke = finalMethod.invoke(null, po);
|
||||
Object invoke1 = invoke;
|
||||
//返回oracle转influx,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化
|
||||
return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
//插入influxdb
|
||||
influxDBBaseService.insertBatchBySlice(tableName, list1, 10000);
|
||||
// size = list1.size();
|
||||
//最后一片时修改状态
|
||||
}
|
||||
//手动执行GC
|
||||
System.gc();
|
||||
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user