diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java index ca4d682..e09149d 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java @@ -211,140 +211,6 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { } }); } - //按小时来同步数据 -// @Override -// @Async -// public void hourseDataBacthSysc(DataAsynParam dataAsynParam) { -// Runtime runtime = Runtime.getRuntime(); -// System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); -// System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); -// System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); -// //目前且作为2片,后续将该属性提取到配置文件中 -// List tableNames = dataAsynParam.getTableNames(); -// //嵌套循环,先循环指标,再循环日期 -// tableNames.stream().forEach(tableName -> { -// IReplenishMybatisService executor; -// try { -// executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX)); -// } catch (ClassNotFoundException e) { -// throw new RuntimeException(e); -// } -// -// -// //日志记录 -// JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName) -// .eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime()) -// .one(); -// if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) { -// //如果该指标当前时间段已执行或正在执行,直接跳出循环 -// return; -// } -// if (Objects.isNull(jobDetailInfluxDBHours)) { -// jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now()); -// jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours); -// } -// //程序监听 -// StopWatch stopWatch = new StopWatch(); -// stopWatch.start(); -// LocalDateTime startTime = dataAsynParam.getStartDateTime(); -// LocalDateTime endTime = dataAsynParam.getEndDateTime(); -// //查询该时区的数据,并准备入库 -// MigrationParam migration = new MigrationParam(); -// migration.setStartTime(startTime); -// migration.setEndTime(endTime); -// System.out.println("执行扫描起始时间------------------------------------"+startTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); -// System.out.println("执行扫描结束时间------------------------------------"+endTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); -// -// -// List list = executor.queryData(migration); -// System.out.println("查询到的数据++++++++++++++"+list.size()); -// //反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid -// Iterator iterator = list.iterator(); -// while (iterator.hasNext()) { -// try{ -// Object obj = iterator.next(); -// //获取 -// Field id = obj.getClass().getDeclaredField("lineid"); -// id.setAccessible(true); //暴力访问id -// String id1 = id.get(obj).toString(); -// if (!IdMappingCache.LineIdMapping.containsKey(id1)){ -// log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); -// iterator.remove(); -// }else { -// id.set(obj, IdMappingCache.LineIdMapping.get(id1)); -// } -// }catch (Exception e){ -// e.printStackTrace(); -// } -// -// -// } -// -// //采用弱引用接受,后续手动调用gc后,会清空该对象 -// WeakReference weakReferenceData = new WeakReference<>(list); -// int size = 0; -// if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { -// size = weakReferenceData.get().size(); -// } -// System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); -// System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); -// System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); -// try { -// if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { -// //执行目标库的数据处理 -// Class clazz; -// Class clazz2; -// //获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2 -// try { -// clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName); -// clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName); -// } catch (ClassNotFoundException e) { -// throw new RuntimeException(e); -// } -// Method method; -// try { -// method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2); -// } catch (NoSuchMethodException e) { -// throw new RuntimeException(e); -// } -// method.setAccessible(true); -// Method finalMethod = method; -// List list1 = (List) weakReferenceData.get().stream().flatMap(po -> { -// try { -// Object invoke = finalMethod.invoke(null, po); -// Object invoke1 = invoke; -// //返回oracle转influx,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化 -// return invoke1 instanceof List ? ((List) invoke1).stream() : Stream.of(invoke1); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// }).collect(Collectors.toList()); -// //插入influxdb -// influxDBBaseService.insertBatchBySlice(tableName, list1, 10000); -//// size = list1.size(); -// //最后一片时修改状态 -// } -// //手动执行GC -// System.gc(); -// stopWatch.stop(); -// jobDetailInfluxDBHours.setRowCount(size); -// jobDetailInfluxDBHours.setState(1); -// jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds()); -// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); -// } catch (Exception exception) { -// exception.printStackTrace(); -// jobDetailInfluxDBHours.setState(2); -// jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now()); -// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); -// } -// System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); -// System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); -// System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); -// -// -// }); -// } - //冀北现场存在数据更新时间不在1小时内,会丢失数据的情况,这边根据装置最新时间往前推1个小时查询数据 @Override @@ -384,10 +250,10 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { lineTimeList.forEach(item->{ MigrationParam migration = new MigrationParam(); migration.setLineIds(Collections.singletonList(item.getLineIndex())); - migration.setStartTime(item.getUpdateTime().minusHours(1)); + migration.setStartTime(item.getUpdateTime().minusHours(2)); migration.setEndTime(item.getUpdateTime()); System.out.println("当前监测点为------------------------------------"+item.getLineIndex()); - System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(1).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); list.addAll(executor.queryData(migration)); }); diff --git a/mysql-data/mysql-target/src/main/java/com/njcn/mysql/job/OracleToMysqlDBJob.java b/mysql-data/mysql-target/src/main/java/com/njcn/mysql/job/OracleToMysqlDBJob.java index b546e07..f8968f4 100644 --- a/mysql-data/mysql-target/src/main/java/com/njcn/mysql/job/OracleToMysqlDBJob.java +++ b/mysql-data/mysql-target/src/main/java/com/njcn/mysql/job/OracleToMysqlDBJob.java @@ -30,7 +30,7 @@ public class OracleToMysqlDBJob { private final OracleToMysqlService oracleToMysqlService; - @Scheduled(cron="0 0 7 * * ?") + @Scheduled(cron="0 0 8 * * ?") public void executeEvent() { // 获取当前时间 String date = DateUtil.format(LocalDateTime.now().minusDays(1), DatePattern.NORM_DATE_PATTERN);