diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java index e09149d..b9857ce 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java @@ -356,4 +356,119 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { this.hourseDataBacthSysc(dataAsynParam1); } } + + + + @Override + public void oneMonitorDataTransport(DataAsynParam dataAsynParam) { + Runtime runtime = Runtime.getRuntime(); + System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + //目前且作为2片,后续将该属性提取到配置文件中 + List tableNames = dataAsynParam.getTableNames(); + //嵌套循环,先循环指标,再循环日期 + tableNames.stream().forEach(tableName -> { + IReplenishMybatisService executor; + try { + executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX)); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + List dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime()); + for (LocalDate date : dateList) { + + //分片下,每段时间的小时数 + int sliceHour = 24 / slice; + for (int i = 0; i < slice; i++) { + String dateStr = LocalDateTimeUtil.format(date, DatePattern.NORM_DATE_PATTERN); + LocalDateTime startTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * i < 10 ? "0" + sliceHour * i : sliceHour * i) + ":00:00", DatePattern.NORM_DATETIME_PATTERN); + LocalDateTime endTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * (i + 1) - 1 < 10 ? "0" + (sliceHour * (i + 1) - 1) : sliceHour * (i + 1) - 1) + ":59:59", DatePattern.NORM_DATETIME_PATTERN); + //查询该时区的数据,并准备入库 + MigrationParam migration = new MigrationParam(); + migration.setStartTime(startTime); + migration.setEndTime(endTime); + if (Objects.nonNull(dataAsynParam.getMonitorId())) { + migration.setLineIds(Stream.of(dataAsynParam.getMonitorId()).collect(Collectors.toList())); + } + + List list = executor.queryData(migration); + //反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid + Iterator iterator = list.iterator(); + while (iterator.hasNext()) { + try { + Object obj = iterator.next(); + //获取 + Field id = obj.getClass().getDeclaredField("lineid"); + id.setAccessible(true); //暴力访问id + String id1 = id.get(obj).toString(); + if (!IdMappingCache.LineIdMapping.containsKey(id1)) { + log.info(tableName + "表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid" + id1); + iterator.remove(); + } else { + id.set(obj, IdMappingCache.LineIdMapping.get(id1)); + } + } catch (Exception e) { + e.printStackTrace(); + } + + } + + //采用弱引用接受,后续手动调用gc后,会清空该对象 + WeakReference weakReferenceData = new WeakReference<>(list); + int size = 0; + if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { + size = weakReferenceData.get().size(); + } + System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + try { + if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { + //执行目标库的数据处理 + Class clazz; + Class clazz2; + //获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2 + try { + clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName); + clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + Method method; + try { + method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + method.setAccessible(true); + Method finalMethod = method; + List list1 = (List) weakReferenceData.get().stream().flatMap(po -> { + try { + Object invoke = finalMethod.invoke(null, po); + Object invoke1 = invoke; + //返回oracle转influx,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化 + return invoke1 instanceof List ? ((List) invoke1).stream() : Stream.of(invoke1); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + //插入influxdb + influxDBBaseService.insertBatchBySlice(tableName, list1, 10000); +// size = list1.size(); + //最后一片时修改状态 + } + //手动执行GC + System.gc(); + + } catch (Exception exception) { + exception.printStackTrace(); + } + System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + } + } + }); + } }