修改bug

This commit is contained in:
hzj
2025-10-17 14:46:21 +08:00
parent 026d71a060
commit 8900f41486
2 changed files with 51 additions and 39 deletions

View File

@@ -16,20 +16,20 @@ import java.util.stream.Collectors;
@Getter
public enum TableEnum {
DATAFLICKER("DataFlicker","电压闪变数据表", 1),
// DATAFLUC("DataFluc","电压波动数据表", 1),
// DATAHARMPHASICI("DataHarmphasicI","谐波电流角度数据表", 4),
// DATAHARMPHASICV("DataHarmphasicV","谐波电压角度数据表", 4),
// DATAHARMPOWERP("DataHarmpowerP","有功功率数据表", 4),
// DATAHARMPOWERQ("DataHarmpowerQ","无功功率数据表", 4),
// DATAHARMPOWERS("DataHarmpowerS","视在功率数据表", 4),
// DATAHARMRATEI("DataHarmrateI","谐波电流含有率数据表", 4),
DATAFLUC("DataFluc","电压波动数据表", 1),
DATAHARMPHASICI("DataHarmphasicI","谐波电流角度数据表", 4),
DATAHARMPHASICV("DataHarmphasicV","谐波电压角度数据表", 4),
DATAHARMPOWERP("DataHarmpowerP","有功功率数据表", 4),
DATAHARMPOWERQ("DataHarmpowerQ","无功功率数据表", 4),
DATAHARMPOWERS("DataHarmpowerS","视在功率数据表", 4),
DATAHARMRATEI("DataHarmrateI","谐波电流含有率数据表", 4),
DATAHARMRATEV("DataHarmrateV","谐波电压含有率数据表", 4),
// DATAINHARMI("DataInharmI","电流简谐波幅值数据表", 4),
DATAINHARMI("DataInharmI","电流简谐波幅值数据表", 4),
DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4),
DATAI("DataI","谐波电流幅值数据表", 4),
DATAPLT("DataPlt","长时闪变数据表", 1),
DATAV("DataV","谐波电压幅值数据表", 4),
COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4),
// COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4),
;

View File

@@ -95,7 +95,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
//添加记录批处理日志,执行为批处理且最后一张表执行完记录
if(dataAsynParam.getExcuteType()==2&&Objects.equals(tableName, TableEnum.DATAV.getCode())){
if(dataAsynParam.getExcuteType()==2&&Objects.equals(tableName, TableEnum.DATAHARMPOWERS.getCode())){
JobHistoryLogInfluxdb jobHistoryLogInfluxdbLog = new JobHistoryLogInfluxdb(date,LocalDateTime.now());
jobHistoryLogInfluxdbService.save(jobHistoryLogInfluxdbLog);
}
@@ -220,6 +220,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
@Override
public void hourseDataBacthSysc(DataAsynParam dataAsynParam) {
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
List<Integer> oracleLineIdList = LineIdMapping.keySet().stream().map(Integer::parseInt).collect(Collectors.toList());
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
Runtime runtime = Runtime.getRuntime();
@@ -237,17 +238,17 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
throw new RuntimeException(e);
}
//日志记录
JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName)
.eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime())
.one();
if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) {
//如果该指标当前时间段已执行或正在执行,直接跳出循环
return;
}
if (Objects.isNull(jobDetailInfluxDBHours)) {
jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now());
jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours);
}
// JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName)
// .eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime())
// .one();
// if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) {
// //如果该指标当前时间段已执行或正在执行,直接跳出循环
// return;
// }
// if (Objects.isNull(jobDetailInfluxDBHours)) {
// jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now());
// jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours);
// }
//程序监听
StopWatch stopWatch = new StopWatch();
stopWatch.start();
@@ -256,6 +257,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
MigrationParam migration = new MigrationParam();
migration.setStartTime(dataAsynParam.getStartDateTime());
migration.setEndTime(dataAsynParam.getEndDateTime());
migration.setLineIds(oracleLineIdList);
System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
list.addAll(executor.queryData(migration));
@@ -283,18 +285,25 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
}
}
}else{
//获取监测点最新的数据时间,单监测点查询数据
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
lineTimeList.forEach(item->{
MigrationParam migration = new MigrationParam();
migration.setLineIds(Collections.singletonList(item.getLineIndex()));
migration.setStartTime(item.getUpdateTime().minusHours(2));
migration.setEndTime(item.getUpdateTime());
System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
migration.setStartTime(dataAsynParam.getStartDateTime());
migration.setEndTime(dataAsynParam.getEndDateTime());
migration.setLineIds(oracleLineIdList);
System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
list.addAll(executor.queryData(migration));
});
//获取监测点最新的数据时间,单监测点查询数据
// List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
// lineTimeList.forEach(item->{
// MigrationParam migration = new MigrationParam();
// migration.setLineIds(Collections.singletonList(item.getLineIndex()));
// migration.setStartTime(item.getUpdateTime().minusHours(2));
// migration.setEndTime(item.getUpdateTime());
// System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
// System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
// System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
// list.addAll(executor.queryData(migration));
// });
System.out.println("查询到的数据++++++++++++++"+list.size());
//反射獲取linid的值并把linid的值替换成mysql对应的linid并记录未匹配的lineid
Iterator iterator = list.iterator();
@@ -362,15 +371,15 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
//手动执行GC
System.gc();
stopWatch.stop();
jobDetailInfluxDBHours.setRowCount(size);
jobDetailInfluxDBHours.setState(1);
jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds());
jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
// jobDetailInfluxDBHours.setRowCount(size);
// jobDetailInfluxDBHours.setState(1);
// jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds());
// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
} catch (Exception exception) {
exception.printStackTrace();
jobDetailInfluxDBHours.setState(2);
jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now());
jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
// jobDetailInfluxDBHours.setState(2);
// jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now());
// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
}
System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
@@ -405,6 +414,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
@Override
public void oneMonitorDataTransport(DataAsynParam dataAsynParam) {
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
List<Integer> oracleLineIdList = LineIdMapping.keySet().stream().map(Integer::parseInt).collect(Collectors.toList());
Runtime runtime = Runtime.getRuntime();
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
@@ -433,6 +443,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
MigrationParam migration = new MigrationParam();
migration.setStartTime(startTime);
migration.setEndTime(endTime);
migration.setLineIds(oracleLineIdList);
if (Objects.nonNull(dataAsynParam.getMonitorId())) {
migration.setLineIds(Stream.of(dataAsynParam.getMonitorId()).collect(Collectors.toList()));
}