代码调整

This commit is contained in:
xy
2024-06-12 18:02:06 +08:00
parent 4b9a02c0bc
commit af6f35dfad
2 changed files with 3 additions and 137 deletions

View File

@@ -211,140 +211,6 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
} }
}); });
} }
//按小时来同步数据
// @Override
// @Async
// public void hourseDataBacthSysc(DataAsynParam dataAsynParam) {
// Runtime runtime = Runtime.getRuntime();
// System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
// System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
// System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
// //目前且作为2片后续将该属性提取到配置文件中
// List<String> tableNames = dataAsynParam.getTableNames();
// //嵌套循环,先循环指标,再循环日期
// tableNames.stream().forEach(tableName -> {
// IReplenishMybatisService executor;
// try {
// executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX));
// } catch (ClassNotFoundException e) {
// throw new RuntimeException(e);
// }
//
//
// //日志记录
// JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName)
// .eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime())
// .one();
// if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) {
// //如果该指标当前时间段已执行或正在执行,直接跳出循环
// return;
// }
// if (Objects.isNull(jobDetailInfluxDBHours)) {
// jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now());
// jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours);
// }
// //程序监听
// StopWatch stopWatch = new StopWatch();
// stopWatch.start();
// LocalDateTime startTime = dataAsynParam.getStartDateTime();
// LocalDateTime endTime = dataAsynParam.getEndDateTime();
// //查询该时区的数据,并准备入库
// MigrationParam migration = new MigrationParam();
// migration.setStartTime(startTime);
// migration.setEndTime(endTime);
// System.out.println("执行扫描起始时间------------------------------------"+startTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
// System.out.println("执行扫描结束时间------------------------------------"+endTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
//
//
// List list = executor.queryData(migration);
// System.out.println("查询到的数据++++++++++++++"+list.size());
// //反射獲取linid的值并把linid的值替换成mysql对应的linid并记录未匹配的lineid
// Iterator iterator = list.iterator();
// while (iterator.hasNext()) {
// try{
// Object obj = iterator.next();
// //获取
// Field id = obj.getClass().getDeclaredField("lineid");
// id.setAccessible(true); //暴力访问id
// String id1 = id.get(obj).toString();
// if (!IdMappingCache.LineIdMapping.containsKey(id1)){
// log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
// iterator.remove();
// }else {
// id.set(obj, IdMappingCache.LineIdMapping.get(id1));
// }
// }catch (Exception e){
// e.printStackTrace();
// }
//
//
// }
//
// //采用弱引用接受后续手动调用gc后会清空该对象
// WeakReference<List> weakReferenceData = new WeakReference<>(list);
// int size = 0;
// if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
// size = weakReferenceData.get().size();
// }
// System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
// System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
// System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
// try {
// if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
// //执行目标库的数据处理
// Class<?> clazz;
// Class<?> clazz2;
// //获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
// try {
// clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName);
// clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName);
// } catch (ClassNotFoundException e) {
// throw new RuntimeException(e);
// }
// Method method;
// try {
// method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
// } catch (NoSuchMethodException e) {
// throw new RuntimeException(e);
// }
// method.setAccessible(true);
// Method finalMethod = method;
// List list1 = (List) weakReferenceData.get().stream().flatMap(po -> {
// try {
// Object invoke = finalMethod.invoke(null, po);
// Object invoke1 = invoke;
// //返回oracle转influxflicker等表是1-1还有1-4这是判断返回是否是集合如何是集合继续扁平化
// return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }).collect(Collectors.toList());
// //插入influxdb
// influxDBBaseService.insertBatchBySlice(tableName, list1, 10000);
//// size = list1.size();
// //最后一片时修改状态
// }
// //手动执行GC
// System.gc();
// stopWatch.stop();
// jobDetailInfluxDBHours.setRowCount(size);
// jobDetailInfluxDBHours.setState(1);
// jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds());
// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
// } catch (Exception exception) {
// exception.printStackTrace();
// jobDetailInfluxDBHours.setState(2);
// jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now());
// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
// }
// System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
// System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
// System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
//
//
// });
// }
//冀北现场存在数据更新时间不在1小时内会丢失数据的情况这边根据装置最新时间往前推1个小时查询数据 //冀北现场存在数据更新时间不在1小时内会丢失数据的情况这边根据装置最新时间往前推1个小时查询数据
@Override @Override
@@ -384,10 +250,10 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
lineTimeList.forEach(item->{ lineTimeList.forEach(item->{
MigrationParam migration = new MigrationParam(); MigrationParam migration = new MigrationParam();
migration.setLineIds(Collections.singletonList(item.getLineIndex())); migration.setLineIds(Collections.singletonList(item.getLineIndex()));
migration.setStartTime(item.getUpdateTime().minusHours(1)); migration.setStartTime(item.getUpdateTime().minusHours(2));
migration.setEndTime(item.getUpdateTime()); migration.setEndTime(item.getUpdateTime());
System.out.println("当前监测点为------------------------------------"+item.getLineIndex()); System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(1).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
list.addAll(executor.queryData(migration)); list.addAll(executor.queryData(migration));
}); });

View File

@@ -30,7 +30,7 @@ public class OracleToMysqlDBJob {
private final OracleToMysqlService oracleToMysqlService; private final OracleToMysqlService oracleToMysqlService;
@Scheduled(cron="0 0 7 * * ?") @Scheduled(cron="0 0 8 * * ?")
public void executeEvent() { public void executeEvent() {
// 获取当前时间 // 获取当前时间
String date = DateUtil.format(LocalDateTime.now().minusDays(1), DatePattern.NORM_DATE_PATTERN); String date = DateUtil.format(LocalDateTime.now().minusDays(1), DatePattern.NORM_DATE_PATTERN);