diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java index 95570e7..c618c33 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java @@ -16,20 +16,20 @@ import java.util.stream.Collectors; @Getter public enum TableEnum { DATAFLICKER("DataFlicker","电压闪变数据表", 1), -// DATAFLUC("DataFluc","电压波动数据表", 1), -// DATAHARMPHASICI("DataHarmphasicI","谐波电流角度数据表", 4), -// DATAHARMPHASICV("DataHarmphasicV","谐波电压角度数据表", 4), -// DATAHARMPOWERP("DataHarmpowerP","有功功率数据表", 4), -// DATAHARMPOWERQ("DataHarmpowerQ","无功功率数据表", 4), -// DATAHARMPOWERS("DataHarmpowerS","视在功率数据表", 4), -// DATAHARMRATEI("DataHarmrateI","谐波电流含有率数据表", 4), + DATAFLUC("DataFluc","电压波动数据表", 1), + DATAHARMPHASICI("DataHarmphasicI","谐波电流角度数据表", 4), + DATAHARMPHASICV("DataHarmphasicV","谐波电压角度数据表", 4), + DATAHARMPOWERP("DataHarmpowerP","有功功率数据表", 4), + DATAHARMPOWERQ("DataHarmpowerQ","无功功率数据表", 4), + DATAHARMPOWERS("DataHarmpowerS","视在功率数据表", 4), + DATAHARMRATEI("DataHarmrateI","谐波电流含有率数据表", 4), DATAHARMRATEV("DataHarmrateV","谐波电压含有率数据表", 4), -// DATAINHARMI("DataInharmI","电流简谐波幅值数据表", 4), + DATAINHARMI("DataInharmI","电流简谐波幅值数据表", 4), DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4), DATAI("DataI","谐波电流幅值数据表", 4), DATAPLT("DataPlt","长时闪变数据表", 1), DATAV("DataV","谐波电压幅值数据表", 4), - COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4), +// COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4), ; diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java index c2adb48..ef1a3c9 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java @@ -95,7 +95,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { //添加记录批处理日志,执行为批处理且最后一张表执行完记录 - if(dataAsynParam.getExcuteType()==2&&Objects.equals(tableName, TableEnum.DATAV.getCode())){ + if(dataAsynParam.getExcuteType()==2&&Objects.equals(tableName, TableEnum.DATAHARMPOWERS.getCode())){ JobHistoryLogInfluxdb jobHistoryLogInfluxdbLog = new JobHistoryLogInfluxdb(date,LocalDateTime.now()); jobHistoryLogInfluxdbService.save(jobHistoryLogInfluxdbLog); } @@ -220,6 +220,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { @Override public void hourseDataBacthSysc(DataAsynParam dataAsynParam) { Map LineIdMapping = (Map) redisUtil.getObjectByKey("LineIdMapping"); + List oracleLineIdList = LineIdMapping.keySet().stream().map(Integer::parseInt).collect(Collectors.toList()); Map DevIdMapping = (Map) redisUtil.getObjectByKey("DevIdMapping"); Runtime runtime = Runtime.getRuntime(); @@ -237,17 +238,17 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { throw new RuntimeException(e); } //日志记录 - JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName) - .eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime()) - .one(); - if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) { - //如果该指标当前时间段已执行或正在执行,直接跳出循环 - return; - } - if (Objects.isNull(jobDetailInfluxDBHours)) { - jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now()); - jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours); - } +// JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName) +// .eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime()) +// .one(); +// if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) { +// //如果该指标当前时间段已执行或正在执行,直接跳出循环 +// return; +// } +// if (Objects.isNull(jobDetailInfluxDBHours)) { +// jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now()); +// jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours); +// } //程序监听 StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -256,6 +257,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { MigrationParam migration = new MigrationParam(); migration.setStartTime(dataAsynParam.getStartDateTime()); migration.setEndTime(dataAsynParam.getEndDateTime()); + migration.setLineIds(oracleLineIdList); System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); list.addAll(executor.queryData(migration)); @@ -283,18 +285,25 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { } } }else{ + MigrationParam migration = new MigrationParam(); + migration.setStartTime(dataAsynParam.getStartDateTime()); + migration.setEndTime(dataAsynParam.getEndDateTime()); + migration.setLineIds(oracleLineIdList); + System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + list.addAll(executor.queryData(migration)); //获取监测点最新的数据时间,单监测点查询数据 - List lineTimeList = lineTimeMapper.getLineTime(); - lineTimeList.forEach(item->{ - MigrationParam migration = new MigrationParam(); - migration.setLineIds(Collections.singletonList(item.getLineIndex())); - migration.setStartTime(item.getUpdateTime().minusHours(2)); - migration.setEndTime(item.getUpdateTime()); - System.out.println("当前监测点为------------------------------------"+item.getLineIndex()); - System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); - System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); - list.addAll(executor.queryData(migration)); - }); +// List lineTimeList = lineTimeMapper.getLineTime(); +// lineTimeList.forEach(item->{ +// MigrationParam migration = new MigrationParam(); +// migration.setLineIds(Collections.singletonList(item.getLineIndex())); +// migration.setStartTime(item.getUpdateTime().minusHours(2)); +// migration.setEndTime(item.getUpdateTime()); +// System.out.println("当前监测点为------------------------------------"+item.getLineIndex()); +// System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); +// System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); +// list.addAll(executor.queryData(migration)); +// }); System.out.println("查询到的数据++++++++++++++"+list.size()); //反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid Iterator iterator = list.iterator(); @@ -362,15 +371,15 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { //手动执行GC System.gc(); stopWatch.stop(); - jobDetailInfluxDBHours.setRowCount(size); - jobDetailInfluxDBHours.setState(1); - jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds()); - jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); +// jobDetailInfluxDBHours.setRowCount(size); +// jobDetailInfluxDBHours.setState(1); +// jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds()); +// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); } catch (Exception exception) { exception.printStackTrace(); - jobDetailInfluxDBHours.setState(2); - jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now()); - jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); +// jobDetailInfluxDBHours.setState(2); +// jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now()); +// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); } System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); @@ -405,6 +414,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { @Override public void oneMonitorDataTransport(DataAsynParam dataAsynParam) { Map LineIdMapping = (Map) redisUtil.getObjectByKey("LineIdMapping"); + List oracleLineIdList = LineIdMapping.keySet().stream().map(Integer::parseInt).collect(Collectors.toList()); Runtime runtime = Runtime.getRuntime(); System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); @@ -433,6 +443,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { MigrationParam migration = new MigrationParam(); migration.setStartTime(startTime); migration.setEndTime(endTime); + migration.setLineIds(oracleLineIdList); + if (Objects.nonNull(dataAsynParam.getMonitorId())) { migration.setLineIds(Stream.of(dataAsynParam.getMonitorId()).collect(Collectors.toList())); }