oralce同步到influxDB

This commit is contained in:
hzj
2024-01-17 19:01:51 +08:00
parent 75ead46195
commit c45ff6d110
51 changed files with 238 additions and 66 deletions

View File

@@ -104,6 +104,12 @@
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<!--mysql驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>

View File

@@ -12,28 +12,31 @@ import java.util.stream.Collectors;
* @version V1.0.0
*/
public enum TableEnum {
DATAFLICKER("DataFlicker","电压闪变数据表"),
DATAFLUC("DataFluc","电压波动数据表"),
DATAHARMPHASICI("DataHarmphasicI","谐波电流角度数据表"),
DATAHARMPHASICV("DataHarmphasicV","谐波电压角度数据表"),
DATAHARMPOWERP("DataHarmpowerP","有功功率数据表"),
DATAHARMPOWERQ("DataHarmpowerQ","无功功率数据表"),
DATAHARMPOWERS("DataHarmpowerS","视在功率数据表"),
DATAHARMRATEI("DataHarmrateI","谐波电流含有率数据表"),
DATAHARMRATEV("DataHarmrateV","谐波电压含有率数据表"),
DATAINHARMI("DataInharmI","电流简谐波幅值数据表"),
DATAINHARMV("DataInharmV","电压间谐波幅值数据表"),
DATAI("DataI","谐波电流幅值数据表"),
DATAPLT("DataPlt","长时闪变数据表"),
DATAV("DataV","谐波电压幅值数据表");
DATAFLICKER("DataFlicker","电压闪变数据表", 1),
DATAFLUC("DataFluc","电压波动数据表", 1),
DATAHARMPHASICI("DataHarmphasicI","谐波电流角度数据表", 4),
DATAHARMPHASICV("DataHarmphasicV","谐波电压角度数据表", 4),
DATAHARMPOWERP("DataHarmpowerP","有功功率数据表", 4),
DATAHARMPOWERQ("DataHarmpowerQ","无功功率数据表", 4),
DATAHARMPOWERS("DataHarmpowerS","视在功率数据表", 4),
DATAHARMRATEI("DataHarmrateI","谐波电流含有率数据表", 4),
DATAHARMRATEV("DataHarmrateV","谐波电压含有率数据表", 4),
DATAINHARMI("DataInharmI","电流简谐波幅值数据表", 4),
DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4),
DATAI("DataI","谐波电流幅值数据表", 4),
DATAPLT("DataPlt","长时闪变数据表", 1),
DATAV("DataV","谐波电压幅值数据表", 4);
private final String code;
private final String value;
TableEnum(String code, String value) {
private final Integer multiple;
TableEnum(String code, String value, Integer multiple) {
this.code = code;
this.value = value;
this.multiple = multiple;
}
/**
@@ -62,4 +65,13 @@ public enum TableEnum {
return null;
}
public static Integer getMultipleByCode(String code) {
for (TableEnum item : TableEnum.values()) {
if (item.code.equals(code)) {
return item.multiple;
}
}
return null;
}
}

View File

@@ -55,7 +55,7 @@ public class InfluxDBDataFlicker {
Instant instant = dataFlicker.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataFlicker.setTime(instant);
influxDBDataFlicker.setLineId(dataFlicker.getLineid()+"");
influxDBDataFlicker.setLineId(dataFlicker.getLineid());
influxDBDataFlicker.setPhaseType(dataFlicker.getPhasicType());
influxDBDataFlicker.setFluc(dataFlicker.getFluc());
influxDBDataFlicker.setPlt(dataFlicker.getPst());

View File

@@ -52,7 +52,7 @@ public class InfluxDBDataFluc {
Instant instant = dataFluc.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataFluc.setTime(instant);
influxDBDataFluc.setLineId(dataFluc.getLineid()+"");
influxDBDataFluc.setLineId(dataFluc.getLineid());
influxDBDataFluc.setPhaseType(dataFluc.getPhasicType());
influxDBDataFluc.setFluc(dataFluc.getFluc());
influxDBDataFluc.setFluccf(dataFluc.getFluccf());

View File

@@ -204,7 +204,7 @@ public class InfluxDBDataHarmphasicI {
Instant instant = dataHarmphasicI.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataHarmPhasicI.setTime(instant);
influxDBDataHarmPhasicI.setLineId(dataHarmphasicI.getLineid()+"");
influxDBDataHarmPhasicI.setLineId(dataHarmphasicI.getLineid());
influxDBDataHarmPhasicI.setPhaseType(dataHarmphasicI.getPhasicType());
influxDBDataHarmPhasicI.setQualityFlag(dataHarmphasicI.getQualityflag()+"");
influxDBDataHarmPhasicI.setValueType(valueType);

View File

@@ -204,7 +204,7 @@ public class InfluxDBDataHarmphasicV {
Instant instant = dataHarmphasicV.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataHarmPhasicV.setTime(instant);
influxDBDataHarmPhasicV.setLineId(dataHarmphasicV.getLineid()+"");
influxDBDataHarmPhasicV.setLineId(dataHarmphasicV.getLineid());
influxDBDataHarmPhasicV.setPhaseType(dataHarmphasicV.getPhasicType());
influxDBDataHarmPhasicV.setQualityFlag(dataHarmphasicV.getQualityflag()+"");
influxDBDataHarmPhasicV.setValueType(valueType);

View File

@@ -213,7 +213,7 @@ public class InfluxDBDataHarmpowerP {
Instant instant = dataHarmpowerP.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataHarmPhasicV.setTime(instant);
influxDBDataHarmPhasicV.setLineId(dataHarmpowerP.getLineid()+"");
influxDBDataHarmPhasicV.setLineId(dataHarmpowerP.getLineid());
influxDBDataHarmPhasicV.setPhaseType(dataHarmpowerP.getPhasicType());
influxDBDataHarmPhasicV.setQualityFlag(dataHarmpowerP.getQualityflag()+"");
influxDBDataHarmPhasicV.setValueType(valueType);

View File

@@ -207,7 +207,7 @@ public class InfluxDBDataHarmpowerQ {
Instant instant = dataHarmpowerQ.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataHarmPhasicV.setTime(instant);
influxDBDataHarmPhasicV.setLineId(dataHarmpowerQ.getLineid()+"");
influxDBDataHarmPhasicV.setLineId(dataHarmpowerQ.getLineid());
influxDBDataHarmPhasicV.setPhaseType(dataHarmpowerQ.getPhasicType());
influxDBDataHarmPhasicV.setQualityFlag(dataHarmpowerQ.getQualityflag()+"");
influxDBDataHarmPhasicV.setValueType(valueType);

View File

@@ -207,7 +207,7 @@ public class InfluxDBDataHarmpowerS {
Instant instant = dataHarmpowerS.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataHarmPhasicV.setTime(instant);
influxDBDataHarmPhasicV.setLineId(dataHarmpowerS.getLineid()+"");
influxDBDataHarmPhasicV.setLineId(dataHarmpowerS.getLineid());
influxDBDataHarmPhasicV.setPhaseType(dataHarmpowerS.getPhasicType());
influxDBDataHarmPhasicV.setQualityFlag(dataHarmpowerS.getQualityflag()+"");
influxDBDataHarmPhasicV.setValueType(valueType);

View File

@@ -203,7 +203,7 @@ public class InfluxDBDataHarmrateI {
Instant instant = dataHarmrateI.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataHarmRateI.setTime(instant);
influxDBDataHarmRateI.setLineId(dataHarmrateI.getLineid()+"");
influxDBDataHarmRateI.setLineId(dataHarmrateI.getLineid());
influxDBDataHarmRateI.setPhaseType(dataHarmrateI.getPhasicType());
influxDBDataHarmRateI.setQualityFlag(dataHarmrateI.getQualityflag()+"");
influxDBDataHarmRateI.setValueType(valueType);

View File

@@ -203,7 +203,7 @@ public class InfluxDBDataHarmrateV {
Instant instant = dataHarmrateV.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataHarmRateV.setTime(instant);
influxDBDataHarmRateV.setLineId(dataHarmrateV.getLineid()+"");
influxDBDataHarmRateV.setLineId(dataHarmrateV.getLineid());
influxDBDataHarmRateV.setPhaseType(dataHarmrateV.getPhasicType());
influxDBDataHarmRateV.setQualityFlag(dataHarmrateV.getQualityflag()+"");
influxDBDataHarmRateV.setValueType(valueType);

View File

@@ -222,7 +222,7 @@ public class InfluxDBDataI {
Instant instant = dataI.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataI.setTime(instant);
influxDBDataI.setLineId(dataI.getLineid()+"");
influxDBDataI.setLineId(dataI.getLineid());
influxDBDataI.setPhaseType(dataI.getPhasicType());
influxDBDataI.setQualityFlag(dataI.getQualityflag()+"");
influxDBDataI.setValueType(valueType);

View File

@@ -205,7 +205,7 @@ public class InfluxDBDataInharmI {
Instant instant = dataInharmI.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataInHarmI.setTime(instant);
influxDBDataInHarmI.setLineId(dataInharmI.getLineid()+"");
influxDBDataInHarmI.setLineId(dataInharmI.getLineid());
influxDBDataInHarmI.setPhaseType(dataInharmI.getPhasicType());
influxDBDataInHarmI.setQualityFlag(dataInharmI.getQualityflag()+"");
influxDBDataInHarmI.setValueType(valueType);

View File

@@ -204,7 +204,7 @@ public class InfluxDBDataInharmV {
Instant instant = dataInharmV.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataInHarmV.setTime(instant);
influxDBDataInHarmV.setLineId(dataInharmV.getLineid()+"");
influxDBDataInHarmV.setLineId(dataInharmV.getLineid());
influxDBDataInHarmV.setPhaseType(dataInharmV.getPhasicType());
influxDBDataInHarmV.setQualityFlag(dataInharmV.getQualityflag()+"");
influxDBDataInHarmV.setValueType(valueType);

View File

@@ -48,7 +48,7 @@ public class InfluxDBDataPlt {
Instant instant = dataPlt.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataPlt.setTime(instant);
influxDBDataPlt.setLineId(dataPlt.getLineid()+"");
influxDBDataPlt.setLineId(dataPlt.getLineid());
influxDBDataPlt.setPhaseType(dataPlt.getPhasicType());
influxDBDataPlt.setPlt(dataPlt.getPlt());
influxDBDataPlt.setQualityFlag(dataPlt.getQualityflag()+"");

View File

@@ -243,7 +243,7 @@ public class InfluxDBDataV {
Instant instant = dataV.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataV.setTime(instant);
influxDBDataV.setLineId(dataV.getLineid()+"");
influxDBDataV.setLineId(dataV.getLineid());
influxDBDataV.setPhasicType(dataV.getPhasicType());
influxDBDataV.setQualityFlag(dataV.getQualityflag()+"");
influxDBDataV.setValueType(valueType);

View File

@@ -0,0 +1,23 @@
package com.njcn.influx.bo.po;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/**
* Description:
* Date: 2024/1/17 14:39【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
@TableName(value = "pq_line_bak")
public class PqLineBak {
@TableId(value = "id")
private String id;
@TableField(value = "line_id")
private String lineId;
}

View File

@@ -0,0 +1,45 @@
package com.njcn.influx.config;
import com.njcn.influx.bo.po.PqLineBak;
import com.njcn.influx.mapper.PqLineBakMapper;
import com.njcn.influx.service.PqLineBakService;
import io.swagger.v3.oas.annotations.servers.Server;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Description:
* Date: 2024/1/17 14:36【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
public class IdMappingCache {
@Autowired
private PqLineBakService pqLineBakService;
public static Map<String, String> IdMapping = new HashMap<>();
@PostConstruct
public void init() {
List<PqLineBak> resultList = pqLineBakService.list();
for (PqLineBak row : resultList) {
String id = row.getId();
String line_id = row.getLineId();
IdMapping.put(line_id,id );
}
}
public String getDataById(String id) {
return IdMapping.get(id);
}
}

View File

@@ -0,0 +1,17 @@
package com.njcn.influx.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.github.jeffreyning.mybatisplus.base.MppBaseMapper;
import com.njcn.influx.bo.po.JobDetailInfluxDB;
import com.njcn.influx.bo.po.PqLineBak;
/**
*
* Description:
* Date: 2024/1/8 12:27【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface PqLineBakMapper extends BaseMapper<PqLineBak> {
}

View File

@@ -0,0 +1,19 @@
package com.njcn.influx.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.influx.bo.po.PqLineBak;
import java.util.List;
/**
* Description:
* Date: 2024/1/15 11:38【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface PqLineBakService extends IService<PqLineBak> {
}

View File

@@ -5,27 +5,31 @@ import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.extra.spring.SpringUtil;
import com.njcn.influx.bo.param.TableEnum;
import com.njcn.influx.bo.po.JobDetailInfluxDB;
import com.njcn.influx.config.IdMappingCache;
import com.njcn.influx.service.JobDetailInfluxDBService;
import com.njcn.influx.service.OracleToInfluxDBService;
import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.bo.po.JobDetail;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
import com.njcn.oracle.service.JobDetailService;
import com.njcn.oracle.util.LocalDateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -39,6 +43,7 @@ import java.util.stream.Stream;
@Service
@RequiredArgsConstructor
@Slf4j
public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
@@ -49,6 +54,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
private final InfluxDBBaseServiceImpl influxDBBaseService;
private final IdMappingCache idMappingCache;
@Value("${business.slice:2}")
private int slice;
@@ -94,8 +101,31 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
MigrationParam migration = new MigrationParam();
migration.setStartTime(startTime);
migration.setEndTime(endTime);
List list = executor.queryData(migration);
//反射獲取linid的值并把linid的值替换成mysql对应的linid并记录未匹配的lineid
Iterator iterator = list.iterator();
while (iterator.hasNext()) {
try{
Object obj = iterator.next();
//获取
Field id = obj.getClass().getDeclaredField("lineid");
id.setAccessible(true); //暴力访问id
String id1 = id.get(obj).toString();
if (!IdMappingCache.IdMapping.containsKey(id1)){
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
iterator.remove();
}else {
id.set(obj, IdMappingCache.IdMapping.get(id1));
}
}catch (Exception e){
e.printStackTrace();
}
}
//采用弱引用接受后续手动调用gc后会清空该对象
WeakReference<List> weakReferenceData = new WeakReference<>(executor.queryData(migration));
WeakReference<List> weakReferenceData = new WeakReference<>(list);
int size = 0;
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
size = weakReferenceData.get().size();
@@ -143,7 +173,7 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
if (i == 0) {
jobDetailInfluxDB.setRowCount(0);
}
jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size);
jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size* TableEnum.getMultipleByCode(tableName));
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
if (i + 1 == slice && Objects.nonNull(jobDetailInfluxDB)) {

View File

@@ -0,0 +1,20 @@
package com.njcn.influx.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.influx.bo.po.PqLineBak;
import com.njcn.influx.mapper.PqLineBakMapper;
import com.njcn.influx.service.PqLineBakService;
import org.springframework.stereotype.Service;
/**
* Description:
* Date: 2024/1/17 15:56【需求编号】
*
* @author clam
* @version V1.0.0
*/
@DS("target")
@Service
public class PqLineBakServiceImpl extends ServiceImpl<PqLineBakMapper, PqLineBak> implements PqLineBakService {
}