@@ -14,8 +14,10 @@ import com.njcn.influx.service.JobDetailHoursInfluxDBService;
import com.njcn.influx.service.JobDetailInfluxDBService ;
import com.njcn.influx.service.JobHistoryLogInfluxdbService ;
import com.njcn.influx.service.OracleToInfluxDBService ;
import com.njcn.oracle.bo.lineTimeDto ;
import com.njcn.oracle.bo.param.DataAsynParam ;
import com.njcn.oracle.bo.param.MigrationParam ;
import com.njcn.oracle.mapper.LineTimeMapper ;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService ;
import com.njcn.oracle.util.LocalDateUtil ;
import lombok.RequiredArgsConstructor ;
@@ -31,10 +33,7 @@ import java.lang.reflect.Method;
import java.time.LocalDate ;
import java.time.LocalDateTime ;
import java.time.format.DateTimeFormatter ;
import java.util.Iterator ;
import java.util.List ;
import java.util.Objects ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.* ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
@@ -65,6 +64,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService ;
private final LineTimeMapper lineTimeMapper ;
@Value ( " ${business.slice:2} " )
private int slice ;
@@ -211,8 +212,142 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
} ) ;
}
//按小时来同步数据
@Override
// @Override
// @Async
// public void hourseDataBacthSysc(DataAsynParam dataAsynParam) {
// Runtime runtime = Runtime.getRuntime();
// System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
// System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
// System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
// //目前且作为2片, 后续将该属性提取到配置文件中
// List<String> tableNames = dataAsynParam.getTableNames();
// //嵌套循环,先循环指标,再循环日期
// tableNames.stream().forEach(tableName -> {
// IReplenishMybatisService executor;
// try {
// executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX));
// } catch (ClassNotFoundException e) {
// throw new RuntimeException(e);
// }
//
//
// //日志记录
// JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName)
// .eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime())
// .one();
// if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) {
// //如果该指标当前时间段已执行或正在执行,直接跳出循环
// return;
// }
// if (Objects.isNull(jobDetailInfluxDBHours)) {
// jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now());
// jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours);
// }
// //程序监听
// StopWatch stopWatch = new StopWatch();
// stopWatch.start();
// LocalDateTime startTime = dataAsynParam.getStartDateTime();
// LocalDateTime endTime = dataAsynParam.getEndDateTime();
// //查询该时区的数据,并准备入库
// MigrationParam migration = new MigrationParam();
// migration.setStartTime(startTime);
// migration.setEndTime(endTime);
// System.out.println("执行扫描起始时间------------------------------------"+startTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
// System.out.println("执行扫描结束时间------------------------------------"+endTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
//
//
// List list = executor.queryData(migration);
// System.out.println("查询到的数据++++++++++++++"+list.size());
// //反射獲取linid的值并把linid的值替换成mysql对应的linid, 并记录未匹配的lineid
// Iterator iterator = list.iterator();
// while (iterator.hasNext()) {
// try{
// Object obj = iterator.next();
// //获取
// Field id = obj.getClass().getDeclaredField("lineid");
// id.setAccessible(true); //暴力访问id
// String id1 = id.get(obj).toString();
// if (!IdMappingCache.LineIdMapping.containsKey(id1)){
// log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
// iterator.remove();
// }else {
// id.set(obj, IdMappingCache.LineIdMapping.get(id1));
// }
// }catch (Exception e){
// e.printStackTrace();
// }
//
//
// }
//
// //采用弱引用接受, 后续手动调用gc后, 会清空该对象
// WeakReference<List> weakReferenceData = new WeakReference<>(list);
// int size = 0;
// if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
// size = weakReferenceData.get().size();
// }
// System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
// System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
// System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
// try {
// if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
// //执行目标库的数据处理
// Class<?> clazz;
// Class<?> clazz2;
// //获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
// try {
// clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName);
// clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName);
// } catch (ClassNotFoundException e) {
// throw new RuntimeException(e);
// }
// Method method;
// try {
// method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
// } catch (NoSuchMethodException e) {
// throw new RuntimeException(e);
// }
// method.setAccessible(true);
// Method finalMethod = method;
// List list1 = (List) weakReferenceData.get().stream().flatMap(po -> {
// try {
// Object invoke = finalMethod.invoke(null, po);
// Object invoke1 = invoke;
// //返回oracle转influx, flicker等表是1-1, 还有1-4, 这是判断返回是否是集合如何是集合继续扁平化
// return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }).collect(Collectors.toList());
// //插入influxdb
// influxDBBaseService.insertBatchBySlice(tableName, list1, 10000);
//// size = list1.size();
// //最后一片时修改状态
// }
// //手动执行GC
// System.gc();
// stopWatch.stop();
// jobDetailInfluxDBHours.setRowCount(size);
// jobDetailInfluxDBHours.setState(1);
// jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds());
// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
// } catch (Exception exception) {
// exception.printStackTrace();
// jobDetailInfluxDBHours.setState(2);
// jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now());
// jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours);
// }
// System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
// System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
// System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
//
//
// });
// }
//冀北现场存在数据更新时间不在1小时内, 会丢失数据的情况, 这边根据装置最新时间往前推1个小时查询数据
@Override
public void hourseDataBacthSysc ( DataAsynParam dataAsynParam ) {
Runtime runtime = Runtime . getRuntime ( ) ;
System . out . println ( " 开始执行前总堆内存为: " + runtime . totalMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
@@ -228,117 +363,113 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
} catch ( ClassNotFoundException e ) {
throw new RuntimeException ( e ) ;
}
//日志记录
JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService . lambdaQuery ( ) . eq ( JobDetailHoursInfluxDB : : getTableName , tableName )
//日志记录
JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService . lambdaQuery ( ) . eq ( JobDetailHoursInfluxDB : : getTableName , tableName )
. eq ( JobDetailHoursInfluxDB : : getExcuteDate , dataAsynParam . getStartDateTime ( ) )
. one ( ) ;
if ( Objects . nonNull ( jobDetailInfluxDBHours ) & & ( jobDetailInfluxDBHours . getState ( ) = = 1 | | jobDetailInfluxDBHours . getState ( ) = = 0 | | jobDetailInfluxDBHours . getState ( ) = = 2 ) ) {
//如果该指标当前时间段已执行或正在执行,直接跳出循环
return ;
}
if ( Objects . isNull ( jobDetailInfluxDBHours ) ) {
jobDetailInfluxDBHours = new JobDetailHoursInfluxDB ( tableName , dataAsynParam . getStartDateTime ( ) , 0 , 0 , LocalDateTime . now ( ) ) ;
jobDetailHoursInfluxDBService . save ( jobDetailInfluxDBHours ) ;
}
//程序监听
StopWatch stopWatch = new StopWatch ( ) ;
stopWatch . start ( ) ;
LocalDateTime startTime = dataAsynParam . getStartDateTime ( ) ;
LocalDateTime endTime = dataAsynParam . getEndDateTime ( ) ;
//查询该时区的数据,并准备入库
if ( Objects . nonNull ( jobDetailInfluxDBHours ) & & ( jobDetailInfluxDBHours . getState ( ) = = 1 | | jobDetailInfluxDBHours . getState ( ) = = 0 | | jobDetailInfluxDBHours . getState ( ) = = 2 ) ) {
//如果该指标当前时间段已执行或正在执行,直接跳出循环
return ;
}
if ( Objects . isNull ( jobDetailInfluxDBHours ) ) {
jobDetailInfluxDBHours = new JobDetailHoursInfluxDB ( tableName , dataAsynParam . getStartDateTime ( ) , 0 , 0 , LocalDateTime . now ( ) ) ;
jobDetailHoursInfluxDBService . save ( jobDetailInfluxDBHours ) ;
}
//程序监听
StopWatch stopWatch = new StopWatch ( ) ;
stopWatch . start ( ) ;
List list = new ArrayList ( Collections . emptyList ( ) ) ;
//获取监测点最新的数据时间,单监测点查询数据
List < lineTimeDto > lineTimeList = lineTimeMapper . getLineTime ( ) ;
lineTimeList . forEach ( item - > {
MigrationParam migration = new MigrationParam ( ) ;
migration . setStartTime ( startTime ) ;
migration . setEnd Time ( endTime ) ;
System . out . println ( " 执行扫描起始时间------------------------------------ " + startTime . format ( DateTimeFormatter . ofPattern ( DatePattern . NORM_DATETIME_PATTERN ) ) ) ;
System . out . println ( " 执行扫描结束时间 ------------------------------------" + endTime . format ( DateTimeFormatter . ofPattern ( DatePattern . NORM_DATETIME_PATTERN ) ) ) ;
List list = executor . queryData ( migration ) ;
System . out . println ( " 查询到的数据++++++++++++++ " + list . size ( ) ) ;
//反射獲取linid的值并把linid的值替换成mysql对应的linid, 并记录未匹配的lineid
Iterator iterator = list . iterator ( ) ;
while ( iterator . hasNext ( ) ) {
try {
Object obj = iterator . next ( ) ;
//获取
Field id = obj . getClass ( ) . getDeclaredField ( " lineid " ) ;
id . setAccessible ( true ) ; //暴力访问id
String id1 = id . g et( obj ) . toString ( ) ;
if ( ! IdMappingCache . LineIdMapping . containsKey ( id1 ) ) {
log . info ( tableName + " 表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid " + id1 ) ;
iterator . remove ( ) ;
} else {
id . set ( obj , IdMappingCache . LineIdMapping . get ( id1 ) ) ;
}
} catch ( Exception e ) {
e . printStackTrace ( ) ;
migration . setLineIds ( Collections . singletonList ( item . getLineIndex ( ) ) ) ;
migration . setStart Time ( item . getUpdateTime ( ) . minusHours ( 1 ) ) ;
migration . setEndTime ( item . getUpdateTime ( ) ) ;
System . out . println ( " 当前监测点为 ------------------------------------" + item . getLineIndex ( ) ) ;
System . out . println ( " 执行扫描起始时间------------------------------------ " + item . getUpdateTime ( ) . minusHours ( 1 ) . format ( DateTimeFormatter . ofPattern ( DatePattern . NORM_DATETIME_PATTERN ) ) ) ;
System . out . println ( " 执行扫描结束时间------------------------------------ " + item . getUpdateTime ( ) . format ( DateTimeFormatter . ofPattern ( DatePattern . NORM_DATETIME_PATTERN ) ) ) ;
list . addAll ( executor . queryData ( migration ) ) ;
} ) ;
System . out . println ( " 查询到的数据++++++++++++++ " + list . size ( ) ) ;
//反射獲取linid的值并把linid的值替换成mysql对应的linid, 并记录未匹配的lineid
Iterator iterator = list . iterator ( ) ;
while ( iterator . hasNext ( ) ) {
try {
Object obj = iterator . next ( ) ;
//获取
Field id = obj . getClass ( ) . getDeclaredField ( " lineid " ) ;
id . s etAccessible ( true ) ; //暴力访问id
String id1 = id . get ( obj ) . toString ( ) ;
if ( ! IdMappingCache . L ineIdMapping . containsKey ( id1 ) ) {
log . info ( tableName + " 表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid " + id1 ) ;
iterator . remove ( ) ;
} else {
id . set ( obj , IdMappingCache . LineIdMapping . get ( id1 ) ) ;
}
} catch ( Exception e ) {
e . printStackTrace ( ) ;
}
//采用弱引用接受, 后续手动调用gc后, 会清空该对象
WeakReference < List > weakReferenceData = new WeakReference < > ( list ) ;
int size = 0 ;
}
//采用弱引用接受, 后续手动调用gc后, 会清空该对象
WeakReference < List > weakReferenceData = new WeakReference < > ( list ) ;
int size = 0 ;
if ( CollectionUtil . isNotEmpty ( weakReferenceData . get ( ) ) ) {
size = weakReferenceData . get ( ) . size ( ) ;
}
System . out . println ( tableName + " 查到 " + size + " 条数据后总堆内存为: " + runtime . totalMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( tableName + " 查到 " + size + " 条数据后已用堆内存为: " + ( runtime . totalMemory ( ) - runtime . freeMemory ( ) ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( tableName + " 查到 " + size + " 条数据后空闲堆内存为: " + runtime . freeMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
try {
if ( CollectionUtil . isNotEmpty ( weakReferenceData . get ( ) ) ) {
size = weakReferenceData . get ( ) . size ( ) ;
}
System . out . println ( tableName + " 查到 " + size + " 条数据后总堆内存为: " + runtime . totalMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( tableName + " 查到 " + size + " 条数据后已用堆内存为: " + ( runtime . totalMemory ( ) - runtime . freeMemory ( ) ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( tableName + " 查到 " + size + " 条数据后空闲堆内存为: " + runtime . freeMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
try {
if ( CollectionUtil . isNotEmpty ( weakReferenceData . get ( ) ) ) {
//执行目标库的数据处理
Class < ? > clazz ;
Class < ? > clazz2 ;
//获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
try {
clazz = Class . forName ( " com.njcn.influx.bo.po.InfluxDB " + tableName ) ;
clazz2 = Class . forName ( " com.njcn.oracle.bo.po. " + tableName ) ;
} catch ( ClassNotFoundException e ) {
throw new RuntimeException ( e ) ;
}
Method method ;
try {
method = clazz . getDeclaredMethod ( " oralceToInfluxDB " , clazz2 ) ;
} catch ( NoSuchMethodException e ) {
throw new RuntimeException ( e ) ;
}
method . setAccessible ( true ) ;
Method finalMethod = method ;
List list1 = ( List ) weakReferenceData . get ( ) . stream ( ) . flatMap ( po - > {
try {
Object invoke = finalMethod . invoke ( null , po ) ;
Object invoke1 = invoke ;
//返回oracle转influx, flicker等表是1-1, 还有1-4, 这是判断返回是否是集合如何是集合继续扁平化
return invoke1 instanceof List ? ( ( List < ? > ) invoke1 ) . stream ( ) : Stream . of ( invoke1 ) ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
}
} ) . collect ( Collectors . toList ( ) ) ;
//插入influxdb
influxDBBaseService . insertBatchBySlice ( tableName , list1 , 10000 ) ;
// size = list1.size();
//最后一片时修改状态
//执行目标库的数据处理
Class < ? > clazz ;
Class < ? > clazz2 ;
//获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
try {
clazz = Class . forName ( " com.njcn.influx.bo.po.InfluxDB " + tableName ) ;
clazz2 = Class . forName ( " com.njcn.oracle.bo.po. " + tableName ) ;
} catch ( ClassNotFoundException e ) {
throw new RuntimeException ( e ) ;
}
//手动执行GC
System . gc ( ) ;
stopWatch . stop ( ) ;
jobDetailInfluxDBHours . setRowCount ( size ) ;
jobDetailInfluxDBHours . setState ( 1 ) ;
jobDetailInfluxDBHours . setDuration ( stopWatch . getTotalTimeSeconds ( ) ) ;
jobDetailHoursInfluxDBService . updateByMultiId ( jobDetailInfluxDBHours ) ;
} catch ( Exception exception ) {
exception . printStackTrace ( ) ;
jobDetailInfluxDBHours . setState ( 2 ) ;
jobDetailInfluxDBHours . setUpdateTime ( LocalDateTime . now ( ) ) ;
jobDetailHoursInfluxDBService . updateByMultiId ( jobDetailInfluxDBHours ) ;
Method method ;
try {
method = clazz . getDeclaredMethod ( " oralceToInfluxDB " , clazz2 ) ;
} catch ( NoSuchMethodException e ) {
throw new RuntimeException ( e ) ;
}
method . setAccessible ( true ) ;
Method finalMethod = method ;
List list1 = ( List ) weakReferenceData . get ( ) . stream ( ) . flatMap ( po - > {
try {
Object invoke = finalMethod . invoke ( null , po ) ;
Object invoke1 = invoke ;
//返回oracle转influx, flicker等表是1-1, 还有1-4, 这是判断返回是否是集合如何是集合继续扁平化
return invoke1 instanceof List ? ( ( List < ? > ) invoke1 ) . stream ( ) : Stream . of ( invoke1 ) ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
}
} ) . collect ( Collectors . toList ( ) ) ;
//插入influxdb
influxDBBaseService . insertBatchBySlice ( tableName , list1 , 10000 ) ;
//最后一片时修改状态
}
System . out . println ( " 执行后总堆内存为: " + runtime . totalMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( " 执行后已用堆内存为: " + ( runtime . totalMemory ( ) - runtime . freeMemory ( ) ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( " 执行后空闲堆内存为: " + runtime . freeMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
//手动执行GC
System . gc ( ) ;
stopWatch . stop ( ) ;
jobDetailInfluxDBHours . setRowCount ( size ) ;
jobDetailInfluxDBHours . setState ( 1 ) ;
jobDetailInfluxDBHours . setDuration ( stopWatch . getTotalTimeSeconds ( ) ) ;
jobDetailHoursInfluxDBService . updateByMultiId ( jobDetailInfluxDBHours ) ;
} catch ( Exception exception ) {
exception . printStackTrace ( ) ;
jobDetailInfluxDBHours . setState ( 2 ) ;
jobDetailInfluxDBHours . setUpdateTime ( LocalDateTime . now ( ) ) ;
jobDetailHoursInfluxDBService . updateByMultiId ( jobDetailInfluxDBHours ) ;
}
System . out . println ( " 执行后总堆内存为: " + runtime . totalMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( " 执行后已用堆内存为: " + ( runtime . totalMemory ( ) - runtime . freeMemory ( ) ) / ( 1024 * 1024 ) + " MB " ) ;
System . out . println ( " 执行后空闲堆内存为: " + runtime . freeMemory ( ) / ( 1024 * 1024 ) + " MB " ) ;
} ) ;