1.oracle同步influxdb代码,监测点运行中断状态

This commit is contained in:
wr
2024-09-29 16:22:34 +08:00
parent 53cc3c85e3
commit 401d2a4e97
10 changed files with 288 additions and 29 deletions

View File

@@ -28,7 +28,9 @@ public enum TableEnum {
DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4), DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4),
DATAI("DataI","谐波电流幅值数据表", 4), DATAI("DataI","谐波电流幅值数据表", 4),
DATAPLT("DataPlt","长时闪变数据表", 1), DATAPLT("DataPlt","长时闪变数据表", 1),
DATAV("DataV","谐波电压幅值数据表", 4); DATAV("DataV","谐波电压幅值数据表", 4),
COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4),
;
private final String code; private final String code;

View File

@@ -0,0 +1,53 @@
package com.njcn.influx.bo.po;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.njcn.influx.utils.InstantDateSerializer;
import com.njcn.oracle.bo.po.ComInfoRmation;
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import org.influxdb.annotation.TimeColumn;
import java.time.Instant;
import java.time.ZoneId;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/7/12 9:55
*/
@Data
@Measurement(name = "pqs_communicate")
public class InfluxDBComInfoRmation {
@TimeColumn
@Column(name = "time")
@JsonSerialize(using = InstantDateSerializer.class)
private Instant time;
@Column(name = "dev_id")
private String devId;
@Column(name = "description")
private String description;
@Column(name = "type")
private Integer type;
public static InfluxDBComInfoRmation oralceToInfluxDB(ComInfoRmation comInfoRmation) {
if (comInfoRmation == null) {
return null;
}
InfluxDBComInfoRmation influxDBDataCommunicate = new InfluxDBComInfoRmation();
Instant instant = comInfoRmation.getUpdateTime().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataCommunicate.setTime(instant);
influxDBDataCommunicate.setDevId(comInfoRmation.getLineIndex());
influxDBDataCommunicate.setDescription(comInfoRmation.getDescription());
influxDBDataCommunicate.setType(comInfoRmation.getType());
return influxDBDataCommunicate;
}
}

View File

@@ -0,0 +1,7 @@
package com.njcn.influx.imapper;
import com.njcn.influx.base.InfluxDbBaseMapper;
import com.njcn.influx.bo.po.InfluxDBComInfoRmation;
public interface InfluxDBComInfoRmationMapper extends InfluxDbBaseMapper<InfluxDBComInfoRmation> {
}

View File

@@ -1,9 +1,11 @@
package com.njcn.influx.service.impl; package com.njcn.influx.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.text.StrPool; import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.extra.spring.SpringUtil; import cn.hutool.extra.spring.SpringUtil;
import com.njcn.influx.bo.param.TableEnum; import com.njcn.influx.bo.param.TableEnum;
import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
@@ -245,36 +247,68 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
List list = new ArrayList(Collections.emptyList()); List list = new ArrayList(Collections.emptyList());
//获取监测点最新的数据时间,单监测点查询数据 if("ComInfoRmation".equals(tableName)){
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
lineTimeList.forEach(item->{
MigrationParam migration = new MigrationParam(); MigrationParam migration = new MigrationParam();
migration.setLineIds(Collections.singletonList(item.getLineIndex())); migration.setStartTime(dataAsynParam.getStartDateTime());
migration.setStartTime(item.getUpdateTime().minusHours(2)); migration.setEndTime(dataAsynParam.getEndDateTime());
migration.setEndTime(item.getUpdateTime()); System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("当前监测点为------------------------------------"+item.getLineIndex()); System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
list.addAll(executor.queryData(migration)); list.addAll(executor.queryData(migration));
}); System.out.println("查询到的数据++++++++++++++"+list.size());
System.out.println("查询到的数据++++++++++++++"+list.size()); //反射获取linid的值并把linid的值替换成mysql对应的devid并记录未匹配的devid
//反射獲取linid的值并把linid的值替换成mysql对应的linid并记录未匹配的lineid Iterator iterator = list.iterator();
Iterator iterator = list.iterator(); while (iterator.hasNext()) {
while (iterator.hasNext()) { try{
try{ Object obj = iterator.next();
Object obj = iterator.next(); //获取
//获取 Field id = obj.getClass().getDeclaredField("lineIndex");
Field id = obj.getClass().getDeclaredField("lineid"); id.setAccessible(true); //暴力访问id
id.setAccessible(true); //暴力访问id Object o = id.get(obj);
String id1 = id.get(obj).toString(); if(ObjectUtil.isNotNull(o)){
if (!IdMappingCache.LineIdMapping.containsKey(id1)){ int index = Integer.parseInt(o.toString())/10;
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); if (!IdMappingCache.DevIdMapping.containsKey(index+"")){
iterator.remove(); log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到devid匹配的devid"+index);
}else { iterator.remove();
id.set(obj, IdMappingCache.LineIdMapping.get(id1)); }else {
id.set(obj, IdMappingCache.DevIdMapping.get(index+""));
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}else{
//获取监测点最新的数据时间,单监测点查询数据
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
lineTimeList.forEach(item->{
MigrationParam migration = new MigrationParam();
migration.setLineIds(Collections.singletonList(item.getLineIndex()));
migration.setStartTime(item.getUpdateTime().minusHours(2));
migration.setEndTime(item.getUpdateTime());
System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
list.addAll(executor.queryData(migration));
});
System.out.println("查询到的数据++++++++++++++"+list.size());
//反射獲取linid的值并把linid的值替换成mysql对应的linid并记录未匹配的lineid
Iterator iterator = list.iterator();
while (iterator.hasNext()) {
try{
Object obj = iterator.next();
//获取
Field id = obj.getClass().getDeclaredField("lineid");
id.setAccessible(true); //暴力访问id
String id1 = id.get(obj).toString();
if (!IdMappingCache.LineIdMapping.containsKey(id1)){
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
iterator.remove();
}else {
id.set(obj, IdMappingCache.LineIdMapping.get(id1));
}
}catch (Exception e){
e.printStackTrace();
} }
}catch (Exception e){
e.printStackTrace();
} }
} }
//采用弱引用接受后续手动调用gc后会清空该对象 //采用弱引用接受后续手动调用gc后会清空该对象
@@ -351,7 +385,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
DataAsynParam dataAsynParam1 = new DataAsynParam(); DataAsynParam dataAsynParam1 = new DataAsynParam();
dataAsynParam1.setEndDateTime(startDateTime1.minusHours(-1).minusSeconds(1)); dataAsynParam1.setEndDateTime(startDateTime1.minusHours(-1).minusSeconds(1));
dataAsynParam1.setStartDateTime(startDateTime1); dataAsynParam1.setStartDateTime(startDateTime1);
dataAsynParam1.setTableNames(TableEnum.getExecutableTypes()); if(CollUtil.isEmpty(dataAsynParam.getTableNames())){
dataAsynParam1.setTableNames(TableEnum.getExecutableTypes());
}else{
dataAsynParam1.setTableNames(dataAsynParam.getTableNames());
}
log.info("执行"+startDateTime1+"时刻数据"); log.info("执行"+startDateTime1+"时刻数据");
this.hourseDataBacthSysc(dataAsynParam1); this.hourseDataBacthSysc(dataAsynParam1);
} }

View File

@@ -2,6 +2,7 @@ package com.njcn.influx.controller;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.StrUtil;
import com.njcn.influx.service.OracleEventDetailToMysqlService; import com.njcn.influx.service.OracleEventDetailToMysqlService;
import com.njcn.influx.service.OracleMonitorStatusToMysqlService; import com.njcn.influx.service.OracleMonitorStatusToMysqlService;
import com.njcn.influx.service.OracleToInfluxDBService; import com.njcn.influx.service.OracleToInfluxDBService;
@@ -18,6 +19,7 @@ import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Collections;
/** /**
@@ -59,6 +61,22 @@ public class OracleToInfluxDBController {
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步"); return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
} }
@GetMapping("/dataSyncTable")
@ApiOperation("数据同步")
public Boolean dataSyncTable(@RequestParam("startDateTime") String startDateTime,
@RequestParam("endDateTime") String endDateTime,
@RequestParam("tableName") String tableName
) {
DataAsynParam dataAsynParam = new DataAsynParam();
dataAsynParam.setStartDateTime(LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATETIME_PATTERN));
dataAsynParam.setEndDateTime(LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATETIME_PATTERN));
if(StrUtil.isNotBlank(tableName)){
dataAsynParam.setTableNames(Collections.singletonList(tableName));
}
oracleToInfluxDBService.AsyncData(dataAsynParam);
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
}
@PostMapping("/oneMonitorDataTransport") @PostMapping("/oneMonitorDataTransport")

View File

@@ -0,0 +1,39 @@
package com.njcn.oracle.bo.po;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* @author wr
* @description
* @date 2024/9/25 11:01
*/
@Data
@TableName("PQS_COMINFORMATION")
public class ComInfoRmation implements Serializable {
private static final long serialVersionUID = 1L;
@TableField("UPDATETIME")
private LocalDateTime updateTime;
@TableField("LINE_INDEX")
private String lineIndex;
@TableField("TYPE")
private Integer type;
@TableField("DESCRIPTION")
private String description;
@TableField("REMARK")
private String remark;
@TableField("STATE")
private Integer state;
}

View File

@@ -0,0 +1,16 @@
package com.njcn.oracle.mapper;
import com.njcn.oracle.bo.po.ComInfoRmation;
import com.njcn.oracle.mybatis.mapper.BatchBaseMapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author hongawen
* @since 2023-12-28
*/
public interface ComInfoRmationMapper extends BatchBaseMapper<ComInfoRmation> {
}

View File

@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.oracle.mapper.ComInfoRmationMapper">
</mapper>

View File

@@ -0,0 +1,16 @@
package com.njcn.oracle.service;
import com.njcn.oracle.bo.po.ComInfoRmation;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
/**
* @Description: 监测点状态监测数据
* @param
* @return:
* @Author: wr
* @Date: 2024/9/25 13:52
*/
public interface IComInfoRmationService extends IReplenishMybatisService<ComInfoRmation> {
}

View File

@@ -0,0 +1,65 @@
package com.njcn.oracle.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.bo.po.ComInfoRmation;
import com.njcn.oracle.mapper.ComInfoRmationMapper;
import com.njcn.oracle.mybatis.service.impl.ReplenishMybatisServiceImpl;
import com.njcn.oracle.service.IComInfoRmationService;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import java.util.List;
/**
* @Description:
* @Author: wr
* @Date: 2024/9/25 13:58
*/
@Service
public class ComInfoRmationServiceImpl extends ReplenishMybatisServiceImpl<ComInfoRmationMapper, ComInfoRmation> implements IComInfoRmationService {
@Override
public List<ComInfoRmation> queryData(MigrationParam migrationParam) {
//查询时间范围内的数据
LambdaQueryWrapper<ComInfoRmation> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime());
if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) {
lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds());
}
return this.baseMapper.selectList(lambdaQueryWrapper);
}
@Override
@DS("target")
public void clearTargetData(MigrationParam migrationParam) {
LambdaQueryWrapper<ComInfoRmation> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime());
if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) {
lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds());
}
this.baseMapper.delete(lambdaQueryWrapper);
}
/**
* 默认500分片如果字段超过20以上建议重写方法调整为1000分片重写时不要忘记@DS注解
*
* @param data 数据集合
*/
@Override
@DS("target")
public void insertBatchByDB(List<ComInfoRmation> data) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
this.insertBatchBySlice(data, 100);
stopWatch.stop();
System.out.printf("pq_ComInfoRmation总计:%d条耗时执行时长%f 秒.%n", data.size(), stopWatch.getTotalTimeSeconds());
}
}