From b20a1dadf7858c190e1d435d9225b6c2e2c8e469 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Mon, 19 Feb 2024 15:51:23 +0800 Subject: [PATCH] =?UTF-8?q?oracle=E5=90=8C=E6=AD=A5=E5=88=B0influxdb?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=AF=8F=E5=B0=8F=E6=97=B6=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E6=89=B9=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../influx/bo/po/JobDetailHoursInfluxDB.java | 71 ++++++++++ .../mapper/JobDetailHoursInfluxDBMapper.java | 15 ++ .../JobDetailHoursInfluxDBService.java | 18 +++ .../service/OracleToInfluxDBService.java | 2 + .../JobDetailHoursInfluxDBServiceImpl.java | 20 +++ .../impl/OracleToInfluxDBServiceImpl.java | 133 ++++++++++++++++++ .../njcn/influx/job/OracleToInfluxDBJob.java | 23 +++ .../src/main/resources/application.yml | 24 +--- 8 files changed, 289 insertions(+), 17 deletions(-) create mode 100644 influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailHoursInfluxDB.java create mode 100644 influx-data/influx-source/src/main/java/com/njcn/influx/mapper/JobDetailHoursInfluxDBMapper.java create mode 100644 influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailHoursInfluxDBService.java create mode 100644 influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailHoursInfluxDBServiceImpl.java diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailHoursInfluxDB.java b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailHoursInfluxDB.java new file mode 100644 index 0000000..d380cc8 --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/JobDetailHoursInfluxDB.java @@ -0,0 +1,71 @@ +package com.njcn.influx.bo.po; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import com.github.jeffreyning.mybatisplus.anno.MppMultiId; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * + * Description: + * Date: 2024/1/23 14:14【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +@TableName(value = "JOB_DETAIL_HOURS_INFLUXDB") +@NoArgsConstructor +@AllArgsConstructor +public class JobDetailHoursInfluxDB { + /** + * 指标表名 + */ + @MppMultiId(value = "TABLE_NAME") + private String tableName; + + /** + * 执行日期 + */ + @MppMultiId(value = "EXCUTE_DATE") + private LocalDateTime excuteDate; + + /** + * 记录数 + */ + @TableField(value = "\"ROW_COUNT\"") + private Integer rowCount; + + /** + * 状态(0-执行中、1-成功、2-失败) + */ + @TableField(value = "\"STATE\"") + private Integer state; + + @TableField(value = "UPDATE_TIME") + private LocalDateTime updateTime; + + /** + * 消耗时长 + */ + @TableField(value = "DURATION") + private Double duration; + + public JobDetailHoursInfluxDB(String tableName, LocalDateTime excuteDate, Integer state, Integer rowCount, LocalDateTime updateTime) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.state = state; + this.rowCount = rowCount; + this.updateTime = updateTime; + } + + public JobDetailHoursInfluxDB(String tableName, LocalDateTime excuteDate, Integer rowCount) { + this.tableName = tableName; + this.excuteDate = excuteDate; + this.rowCount = rowCount; + } +} \ No newline at end of file diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/mapper/JobDetailHoursInfluxDBMapper.java b/influx-data/influx-source/src/main/java/com/njcn/influx/mapper/JobDetailHoursInfluxDBMapper.java new file mode 100644 index 0000000..908553a --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/mapper/JobDetailHoursInfluxDBMapper.java @@ -0,0 +1,15 @@ +package com.njcn.influx.mapper; + +import com.github.jeffreyning.mybatisplus.base.MppBaseMapper; +import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; + +/** + * + * Description: + * Date: 2024/1/23 14:14【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface JobDetailHoursInfluxDBMapper extends MppBaseMapper { +} \ No newline at end of file diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailHoursInfluxDBService.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailHoursInfluxDBService.java new file mode 100644 index 0000000..af61639 --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/JobDetailHoursInfluxDBService.java @@ -0,0 +1,18 @@ +package com.njcn.influx.service; + +import com.github.jeffreyning.mybatisplus.service.IMppService; +import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; +import com.njcn.oracle.bo.po.JobDetailHours; + +/** +* +* Description: +* Date: 2024/1/23 14:14【需求编号】 +* +* @author clam +* @version V1.0.0 +*/ +public interface JobDetailHoursInfluxDBService extends IMppService { + + +} diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/OracleToInfluxDBService.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/OracleToInfluxDBService.java index 641add8..9d7de4f 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/OracleToInfluxDBService.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/OracleToInfluxDBService.java @@ -4,4 +4,6 @@ import com.njcn.oracle.bo.param.DataAsynParam; public interface OracleToInfluxDBService { void dataBacthSysc(DataAsynParam dataAsynParam); + + void hourseDataBacthSysc(DataAsynParam dataAsynParam); } diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailHoursInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailHoursInfluxDBServiceImpl.java new file mode 100644 index 0000000..ed2f5fa --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/JobDetailHoursInfluxDBServiceImpl.java @@ -0,0 +1,20 @@ +package com.njcn.influx.service.impl; + +import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; +import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; +import com.njcn.influx.mapper.JobDetailHoursInfluxDBMapper; +import com.njcn.influx.service.JobDetailHoursInfluxDBService; +import com.njcn.oracle.bo.po.JobDetailHours; +import com.njcn.oracle.mybatis.mapper.JobDetailHoursMapper; +import org.springframework.stereotype.Service; + +/** + * Description: + * Date: 2024/2/19 15:22【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +public class JobDetailHoursInfluxDBServiceImpl extends MppServiceImpl implements JobDetailHoursInfluxDBService { +} diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java index 9f1ad63..8a356a5 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java @@ -6,17 +6,21 @@ import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.text.StrPool; import cn.hutool.extra.spring.SpringUtil; import com.njcn.influx.bo.param.TableEnum; +import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; import com.njcn.influx.bo.po.JobDetailInfluxDB; import com.njcn.influx.bo.po.JobHistoryLogInfluxdb; import com.njcn.influx.config.IdMappingCache; +import com.njcn.influx.service.JobDetailHoursInfluxDBService; import com.njcn.influx.service.JobDetailInfluxDBService; import com.njcn.influx.service.JobHistoryLogInfluxdbService; import com.njcn.influx.service.OracleToInfluxDBService; import com.njcn.oracle.bo.param.DataAsynParam; import com.njcn.oracle.bo.param.MigrationParam; import com.njcn.oracle.bo.param.ServiceTypeEnum; +import com.njcn.oracle.bo.po.JobDetailHours; import com.njcn.oracle.bo.po.JobHistoryLog; import com.njcn.oracle.mybatis.service.IReplenishMybatisService; +import com.njcn.oracle.service.JobDetailHoursService; import com.njcn.oracle.util.LocalDateUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -62,6 +66,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { private final JobHistoryLogInfluxdbService jobHistoryLogInfluxdbService; + private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService; + @Value("${business.slice:2}") private int slice; @@ -207,4 +213,131 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { } }); } + //按小时来同步数据 + @Override + public void hourseDataBacthSysc(DataAsynParam dataAsynParam) { + Runtime runtime = Runtime.getRuntime(); + System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + //目前且作为2片,后续将该属性提取到配置文件中 + List tableNames = dataAsynParam.getTableNames(); + //嵌套循环,先循环指标,再循环日期 + tableNames.stream().forEach(tableName -> { + IReplenishMybatisService executor; + try { + executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX)); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + + //日志记录 + JobDetailHoursInfluxDB jobDetailInfluxDBHours = jobDetailHoursInfluxDBService.lambdaQuery().eq(JobDetailHoursInfluxDB::getTableName, tableName) + .eq(JobDetailHoursInfluxDB::getExcuteDate, dataAsynParam.getStartDateTime()) + .one(); + if (Objects.nonNull(jobDetailInfluxDBHours) && (jobDetailInfluxDBHours.getState() == 1 || jobDetailInfluxDBHours.getState() == 0|| jobDetailInfluxDBHours.getState() == 2)) { + //如果该指标当前时间段已执行或正在执行,直接跳出循环 + return; + } + if (Objects.isNull(jobDetailInfluxDBHours)) { + jobDetailInfluxDBHours = new JobDetailHoursInfluxDB(tableName, dataAsynParam.getStartDateTime(), 0, 0, LocalDateTime.now()); + jobDetailHoursInfluxDBService.save(jobDetailInfluxDBHours); + } + //程序监听 + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + LocalDateTime startTime = dataAsynParam.getStartDateTime(); + LocalDateTime endTime = dataAsynParam.getStartDateTime(); + //查询该时区的数据,并准备入库 + MigrationParam migration = new MigrationParam(); + migration.setStartTime(startTime); + migration.setEndTime(endTime); + List list = executor.queryData(migration); + //反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid + Iterator iterator = list.iterator(); + while (iterator.hasNext()) { + try{ + Object obj = iterator.next(); + //获取 + Field id = obj.getClass().getDeclaredField("lineid"); + id.setAccessible(true); //暴力访问id + String id1 = id.get(obj).toString(); + if (!IdMappingCache.IdMapping.containsKey(id1)){ + log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); + iterator.remove(); + }else { + id.set(obj, IdMappingCache.IdMapping.get(id1)); + } + }catch (Exception e){ + e.printStackTrace(); + } + + + } + + //采用弱引用接受,后续手动调用gc后,会清空该对象 + WeakReference weakReferenceData = new WeakReference<>(list); + int size = 0; + if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { + size = weakReferenceData.get().size(); + } + System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + try { + if (CollectionUtil.isNotEmpty(weakReferenceData.get())) { + //执行目标库的数据处理 + Class clazz; + Class clazz2; + //获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2 + try { + clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName); + clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + Method method; + try { + method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + method.setAccessible(true); + Method finalMethod = method; + List list1 = (List) weakReferenceData.get().stream().flatMap(po -> { + try { + Object invoke = finalMethod.invoke(null, po); + Object invoke1 = invoke; + //返回oracle转influx,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化 + return invoke1 instanceof List ? ((List) invoke1).stream() : Stream.of(invoke1); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + //插入influxdb + influxDBBaseService.insertBatchBySlice(tableName, list1, 10000); +// size = list1.size(); + //最后一片时修改状态 + } + //手动执行GC + System.gc(); + stopWatch.stop(); + jobDetailInfluxDBHours.setRowCount(size); + jobDetailInfluxDBHours.setState(1); + jobDetailInfluxDBHours.setDuration(stopWatch.getTotalTimeSeconds()); + jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); + } catch (Exception exception) { + exception.printStackTrace(); + jobDetailInfluxDBHours.setState(2); + jobDetailInfluxDBHours.setUpdateTime(LocalDateTime.now()); + jobDetailHoursInfluxDBService.updateByMultiId(jobDetailInfluxDBHours); + } + System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); + System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); + System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB"); + + + }); + } } diff --git a/influx-data/influx-target/src/main/java/com/njcn/influx/job/OracleToInfluxDBJob.java b/influx-data/influx-target/src/main/java/com/njcn/influx/job/OracleToInfluxDBJob.java index f5e8c80..a834352 100644 --- a/influx-data/influx-target/src/main/java/com/njcn/influx/job/OracleToInfluxDBJob.java +++ b/influx-data/influx-target/src/main/java/com/njcn/influx/job/OracleToInfluxDBJob.java @@ -11,6 +11,8 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; /** * Description: @@ -36,4 +38,25 @@ public class OracleToInfluxDBJob { dataAsynParam.setExcuteType(2); oracleToInfluxDBService.dataBacthSysc(dataAsynParam); } + + //每小时03分钟时执行上一个小时的数据同步 + @Scheduled(cron="0 3 * * * ?") + public void executeHours() { + DataAsynParam dataAsynParam = new DataAsynParam(); + // 获取当前时间 + LocalDateTime now = LocalDateTime.now(); + + // 减去一个小时 + LocalDateTime oneHourAgo = now.minusHours(1); + + // 将分钟和秒设置为0 + LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS); + // 加上59分钟59秒 + LocalDateTime modifiedResult = result.plusMinutes(59).plusSeconds(59); + dataAsynParam.setStartDateTime(result); + dataAsynParam.setEndDateTime(modifiedResult); + dataAsynParam.setTableNames(TableEnum.getExecutableTypes()); + dataAsynParam.setExcuteType(2); + oracleToInfluxDBService.hourseDataBacthSysc(dataAsynParam); + } } diff --git a/influx-data/influx-target/src/main/resources/application.yml b/influx-data/influx-target/src/main/resources/application.yml index 6849a40..0c1e55b 100644 --- a/influx-data/influx-target/src/main/resources/application.yml +++ b/influx-data/influx-target/src/main/resources/application.yml @@ -7,14 +7,10 @@ server: spring: #influxDB内容配置 influx: - url: http://25.36.232.36:8086 + url: http://192.168.1.102:8086 user: admin - password: admin - database: pqsbase_hbcs -# url: http://192.168.1.81:18086 -# user: admin -# password: 123456 -# database: pqsbase + password: 123456 + database: pqsbase_sjzx mapper-location: com.njcn.influx.imapper application: name: oracle-influx @@ -71,20 +67,14 @@ spring: strict: false datasource: master: - url: jdbc:oracle:thin:@10.122.32.73:11521/dwxb + url: jdbc:oracle:thin:@192.168.1.101:1521:pqsbase username: pqsadmin - password: pqsadmin_123 -# url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase -# username: pqsadmin_hn -# password: pqsadmin + password: Pqsadmin123 driver-class-name: oracle.jdbc.driver.OracleDriver target: - url: jdbc:mysql://25.36.232.37:13306/pmsinfo?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT + url: jdbc:mysql://192.168.1.102:13306/pqsinfo?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT username: root - password: Huawei12# -# url: jdbc:mysql://192.168.1.24:13306/pqsinfo_pq?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT -# username: root -# password: njcnpqs + password: njcnpqs driver-class-name: com.mysql.cj.jdbc.Driver #mybatis配置信息 mybatis-plus: