代码调整
This commit is contained in:
@@ -99,6 +99,12 @@
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
<version>4.4</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -23,7 +23,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_harmphasic_i")
|
||||
public class InfluxDBDataHarmPhasicI {
|
||||
public class InfluxDBDataHarmphasicI {
|
||||
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@@ -193,14 +193,14 @@ public class InfluxDBDataHarmPhasicI {
|
||||
private Float i50;
|
||||
|
||||
|
||||
public static List<InfluxDBDataHarmPhasicI> oralceToInfluxDB(DataHarmphasicI dataHarmphasicI) {
|
||||
public static List<InfluxDBDataHarmphasicI> oralceToInfluxDB(DataHarmphasicI dataHarmphasicI) {
|
||||
if (dataHarmphasicI == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataHarmPhasicI> influxDBDataHarmPhasicIList = new ArrayList<>();
|
||||
List<InfluxDBDataHarmphasicI> influxDBDataHarmPhasicIList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataHarmPhasicI influxDBDataHarmPhasicI = new InfluxDBDataHarmPhasicI();
|
||||
InfluxDBDataHarmphasicI influxDBDataHarmPhasicI = new InfluxDBDataHarmphasicI();
|
||||
Instant instant = dataHarmphasicI.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataHarmPhasicI.setTime(instant);
|
||||
@@ -2,7 +2,6 @@ package com.njcn.influx.bo.po;
|
||||
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.njcn.influx.utils.InstantDateSerializer;
|
||||
import com.njcn.oracle.bo.po.DataHarmphasicI;
|
||||
import com.njcn.oracle.bo.po.DataHarmphasicV;
|
||||
import lombok.Data;
|
||||
import org.influxdb.annotation.Column;
|
||||
@@ -25,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_harmphasic_v")
|
||||
public class InfluxDBDataHarmPhasicV {
|
||||
public class InfluxDBDataHarmphasicV {
|
||||
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@@ -194,14 +193,14 @@ public class InfluxDBDataHarmPhasicV {
|
||||
@Column(name = "v_50")
|
||||
private Float v50;
|
||||
|
||||
public static List<InfluxDBDataHarmPhasicV> oralceToInfluxDB(DataHarmphasicV dataHarmphasicV) {
|
||||
public static List<InfluxDBDataHarmphasicV> oralceToInfluxDB(DataHarmphasicV dataHarmphasicV) {
|
||||
if (dataHarmphasicV == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataHarmPhasicV> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<InfluxDBDataHarmphasicV> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataHarmPhasicV influxDBDataHarmPhasicV = new InfluxDBDataHarmPhasicV();
|
||||
InfluxDBDataHarmphasicV influxDBDataHarmPhasicV = new InfluxDBDataHarmphasicV();
|
||||
Instant instant = dataHarmphasicV.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataHarmPhasicV.setTime(instant);
|
||||
@@ -3,7 +3,6 @@ package com.njcn.influx.bo.po;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.njcn.influx.utils.InstantDateSerializer;
|
||||
import com.njcn.oracle.bo.po.DataHarmpowerP;
|
||||
import com.njcn.oracle.bo.po.DataHarmpowerP;
|
||||
import lombok.Data;
|
||||
import org.influxdb.annotation.Column;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
@@ -25,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_harmpower_p")
|
||||
public class InfluxDBDataHarmPowerP {
|
||||
public class InfluxDBDataHarmpowerP {
|
||||
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@@ -203,14 +202,14 @@ public class InfluxDBDataHarmPowerP {
|
||||
@Column(name = "p_50")
|
||||
private Float p50;
|
||||
|
||||
public static List<InfluxDBDataHarmPowerP> oralceToInfluxDB(DataHarmpowerP dataHarmpowerP) {
|
||||
public static List<InfluxDBDataHarmpowerP> oralceToInfluxDB(DataHarmpowerP dataHarmpowerP) {
|
||||
if (dataHarmpowerP == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataHarmPowerP> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<InfluxDBDataHarmpowerP> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataHarmPowerP influxDBDataHarmPhasicV = new InfluxDBDataHarmPowerP();
|
||||
InfluxDBDataHarmpowerP influxDBDataHarmPhasicV = new InfluxDBDataHarmpowerP();
|
||||
Instant instant = dataHarmpowerP.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataHarmPhasicV.setTime(instant);
|
||||
@@ -24,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_harmpower_q")
|
||||
public class InfluxDBDataHarmPowerQ {
|
||||
public class InfluxDBDataHarmpowerQ {
|
||||
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@@ -196,14 +196,14 @@ public class InfluxDBDataHarmPowerQ {
|
||||
@Column(name = "q_50")
|
||||
private Float q50;
|
||||
|
||||
public static List<InfluxDBDataHarmPowerQ> oralceToInfluxDB(DataHarmpowerQ dataHarmpowerQ) {
|
||||
public static List<InfluxDBDataHarmpowerQ> oralceToInfluxDB(DataHarmpowerQ dataHarmpowerQ) {
|
||||
if (dataHarmpowerQ == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataHarmPowerQ> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<InfluxDBDataHarmpowerQ> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataHarmPowerQ influxDBDataHarmPhasicV = new InfluxDBDataHarmPowerQ();
|
||||
InfluxDBDataHarmpowerQ influxDBDataHarmPhasicV = new InfluxDBDataHarmpowerQ();
|
||||
Instant instant = dataHarmpowerQ.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataHarmPhasicV.setTime(instant);
|
||||
@@ -24,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_harmpower_s")
|
||||
public class InfluxDBDataHarmPowerS {
|
||||
public class InfluxDBDataHarmpowerS {
|
||||
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@@ -196,14 +196,14 @@ public class InfluxDBDataHarmPowerS {
|
||||
@Column(name = "s_50")
|
||||
private Float s50;
|
||||
|
||||
public static List<InfluxDBDataHarmPowerS> oralceToInfluxDB(DataHarmpowerS dataHarmpowerS) {
|
||||
public static List<InfluxDBDataHarmpowerS> oralceToInfluxDB(DataHarmpowerS dataHarmpowerS) {
|
||||
if (dataHarmpowerS == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataHarmPowerS> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<InfluxDBDataHarmpowerS> influxDBDataHarmPhasicVList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataHarmPowerS influxDBDataHarmPhasicV = new InfluxDBDataHarmPowerS();
|
||||
InfluxDBDataHarmpowerS influxDBDataHarmPhasicV = new InfluxDBDataHarmpowerS();
|
||||
Instant instant = dataHarmpowerS.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataHarmPhasicV.setTime(instant);
|
||||
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.njcn.influx.utils.InstantDateSerializer;
|
||||
import com.njcn.oracle.bo.po.DataHarmrateI;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.influxdb.annotation.Column;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
import org.influxdb.annotation.TimeColumn;
|
||||
@@ -25,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_harmrate_i")
|
||||
public class InfluxDBDataHarmRateI {
|
||||
public class InfluxDBDataHarmrateI {
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@JsonSerialize(using = InstantDateSerializer.class)
|
||||
@@ -193,14 +192,14 @@ public class InfluxDBDataHarmRateI {
|
||||
@Column(name = "i_50")
|
||||
private Float i50;
|
||||
|
||||
public static List<InfluxDBDataHarmRateI> oralceToInfluxDB(DataHarmrateI dataHarmrateI) {
|
||||
public static List<InfluxDBDataHarmrateI> oralceToInfluxDB(DataHarmrateI dataHarmrateI) {
|
||||
if (dataHarmrateI == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataHarmRateI> influxDBDataHarmRateIList = new ArrayList<>();
|
||||
List<InfluxDBDataHarmrateI> influxDBDataHarmRateIList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataHarmRateI influxDBDataHarmRateI = new InfluxDBDataHarmRateI();
|
||||
InfluxDBDataHarmrateI influxDBDataHarmRateI = new InfluxDBDataHarmrateI();
|
||||
Instant instant = dataHarmrateI.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataHarmRateI.setTime(instant);
|
||||
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.njcn.influx.utils.InstantDateSerializer;
|
||||
import com.njcn.oracle.bo.po.DataHarmrateV;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.influxdb.annotation.Column;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
import org.influxdb.annotation.TimeColumn;
|
||||
@@ -25,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_harmrate_v")
|
||||
public class InfluxDBDataHarmRateV {
|
||||
public class InfluxDBDataHarmrateV {
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@JsonSerialize(using = InstantDateSerializer.class)
|
||||
@@ -193,14 +192,14 @@ public class InfluxDBDataHarmRateV {
|
||||
@Column(name = "v_50")
|
||||
private Float v50;
|
||||
|
||||
public static List<InfluxDBDataHarmRateV> oralceToInfluxDB(DataHarmrateV dataHarmrateV) {
|
||||
public static List<InfluxDBDataHarmrateV> oralceToInfluxDB(DataHarmrateV dataHarmrateV) {
|
||||
if (dataHarmrateV == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataHarmRateV> influxDBDataHarmRateVList = new ArrayList<>();
|
||||
List<InfluxDBDataHarmrateV> influxDBDataHarmRateVList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataHarmRateV influxDBDataHarmRateV = new InfluxDBDataHarmRateV();
|
||||
InfluxDBDataHarmrateV influxDBDataHarmRateV = new InfluxDBDataHarmrateV();
|
||||
Instant instant = dataHarmrateV.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataHarmRateV.setTime(instant);
|
||||
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.njcn.influx.utils.InstantDateSerializer;
|
||||
import com.njcn.oracle.bo.po.DataInharmI;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.influxdb.annotation.Column;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
import org.influxdb.annotation.TimeColumn;
|
||||
@@ -25,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_inharm_i")
|
||||
public class InfluxDBDataInHarmI {
|
||||
public class InfluxDBDataInharmI {
|
||||
|
||||
@TimeColumn
|
||||
@Column(name = "time",tag = true)
|
||||
@@ -195,14 +194,14 @@ public class InfluxDBDataInHarmI {
|
||||
private Float i50;
|
||||
|
||||
|
||||
public static List<InfluxDBDataInHarmI> oralceToInfluxDB(DataInharmI dataInharmI) {
|
||||
public static List<InfluxDBDataInharmI> oralceToInfluxDB(DataInharmI dataInharmI) {
|
||||
if (dataInharmI == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataInHarmI> influxDBDataInHarmIList = new ArrayList<>();
|
||||
List<InfluxDBDataInharmI> influxDBDataInHarmIList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataInHarmI influxDBDataInHarmI = new InfluxDBDataInHarmI();
|
||||
InfluxDBDataInharmI influxDBDataInHarmI = new InfluxDBDataInharmI();
|
||||
Instant instant = dataInharmI.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataInHarmI.setTime(instant);
|
||||
@@ -24,7 +24,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "data_inharm_v")
|
||||
public class InfluxDBDataInHarmV {
|
||||
public class InfluxDBDataInharmV {
|
||||
|
||||
@Column(name = "time",tag =true)
|
||||
@TimeColumn
|
||||
@@ -193,14 +193,14 @@ public class InfluxDBDataInHarmV {
|
||||
@Column(name = "v_50")
|
||||
private Float v50;
|
||||
|
||||
public static List<InfluxDBDataInHarmV> oralceToInfluxDB(DataInharmV dataInharmV) {
|
||||
public static List<InfluxDBDataInharmV> oralceToInfluxDB(DataInharmV dataInharmV) {
|
||||
if (dataInharmV == null) {
|
||||
return null;
|
||||
}
|
||||
List<InfluxDBDataInHarmV> influxDBDataInHarmVList = new ArrayList<>();
|
||||
List<InfluxDBDataInharmV> influxDBDataInHarmVList = new ArrayList<>();
|
||||
List<String> valueTypeList = Stream.of("AVG", "MAX", "MIN", "CP95").collect(Collectors.toList());
|
||||
for (String valueType : valueTypeList) {
|
||||
InfluxDBDataInHarmV influxDBDataInHarmV = new InfluxDBDataInHarmV();
|
||||
InfluxDBDataInharmV influxDBDataInHarmV = new InfluxDBDataInharmV();
|
||||
Instant instant = dataInharmV.getTimeid().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataInHarmV.setTime(instant);
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataFlicker;
|
||||
|
||||
public interface InfluxDBDataFlickerMapper extends InfluxDbBaseMapper<InfluxDBDataFlicker> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataFluc;
|
||||
|
||||
public interface InfluxDBDataFlucMapper extends InfluxDbBaseMapper<InfluxDBDataFluc> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataHarmphasicI;
|
||||
|
||||
public interface InfluxDBDataHarmphasicIMapper extends InfluxDbBaseMapper<InfluxDBDataHarmphasicI> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataHarmphasicV;
|
||||
|
||||
public interface InfluxDBDataHarmphasicVMapper extends InfluxDbBaseMapper<InfluxDBDataHarmphasicV> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataHarmpowerP;
|
||||
|
||||
public interface InfluxDBDataHarmpowerPMapper extends InfluxDbBaseMapper<InfluxDBDataHarmpowerP> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataHarmpowerQ;
|
||||
|
||||
public interface InfluxDBDataHarmpowerQMapper extends InfluxDbBaseMapper<InfluxDBDataHarmpowerQ> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataHarmpowerS;
|
||||
|
||||
public interface InfluxDBDataHarmpowerSMapper extends InfluxDbBaseMapper<InfluxDBDataHarmpowerS> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataHarmrateI;
|
||||
|
||||
public interface InfluxDBDataHarmrateIMapper extends InfluxDbBaseMapper<InfluxDBDataHarmrateI> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataHarmrateV;
|
||||
|
||||
public interface InfluxDBDataHarmrateVMapper extends InfluxDbBaseMapper<InfluxDBDataHarmrateV> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataI;
|
||||
|
||||
public interface InfluxDBDataIMapper extends InfluxDbBaseMapper<InfluxDBDataI> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataInharmI;
|
||||
|
||||
public interface InfluxDBDataInharmIMapper extends InfluxDbBaseMapper<InfluxDBDataInharmI> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataInharmV;
|
||||
|
||||
public interface InfluxDBDataInharmVMapper extends InfluxDbBaseMapper<InfluxDBDataInharmV> {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataPlt;
|
||||
|
||||
public interface InfluxDBDataPltMapper extends InfluxDbBaseMapper<InfluxDBDataPlt> {
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBDataV;
|
||||
|
||||
public interface InfluxDBDataVMapper extends InfluxDbBaseMapper<InfluxDBDataV> {
|
||||
|
||||
}
|
||||
@@ -19,6 +19,6 @@ public interface InfluxDBBaseService<T> {
|
||||
* @param data 数据集合
|
||||
* @param size 分片的尺寸
|
||||
*/
|
||||
void insertBatchBySlice(List<T> data, int size);
|
||||
void insertBatchBySlice(String tableName,List<T> data, int size);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
package com.njcn.influx.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.service.InfluxDBBaseService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
import org.influxdb.dto.BatchPoints;
|
||||
import org.influxdb.dto.Point;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -25,37 +25,55 @@ import java.util.List;
|
||||
public class InfluxDBBaseServiceImpl<T> implements InfluxDBBaseService<T> {
|
||||
|
||||
private final InfluxDB influxDb;
|
||||
|
||||
private final static String PACKAGE_PREFIX = "com.njcn.influx.imapper.InfluxDB";
|
||||
private final static String PACKAGE_SUFFIX = "Mapper";
|
||||
|
||||
@Value("${spring.influx.database}")
|
||||
private String database;
|
||||
private String database;
|
||||
|
||||
@Override
|
||||
public void insertBatchBySlice(List<T> data, int size) {
|
||||
public void insertBatchBySlice(String tableName, List<T> data, int size) {
|
||||
int totalCount = data.size();
|
||||
int idxLimit = Math.min(size, totalCount);
|
||||
List<List<T>> partition = ListUtil.partition(data, idxLimit);
|
||||
partition.forEach(temp->{
|
||||
if (data.size() > 0) {
|
||||
|
||||
Object firstObj = data.get(0);
|
||||
Class<?> domainClass = firstObj.getClass();
|
||||
List<Point> pointList = new ArrayList<>();
|
||||
for (Object o : data) {
|
||||
Point point = Point
|
||||
.measurementByPOJO(domainClass)
|
||||
.addFieldsFromPOJO(o)
|
||||
.build();
|
||||
pointList.add(point);
|
||||
}
|
||||
//获取数据库名和rp
|
||||
Measurement measurement = firstObj.getClass().getAnnotation(Measurement.class);
|
||||
String retentionPolicy = measurement.retentionPolicy();
|
||||
BatchPoints batchPoints = BatchPoints
|
||||
.builder()
|
||||
.points(pointList)
|
||||
.retentionPolicy(retentionPolicy).build();
|
||||
influxDb.setDatabase(database);
|
||||
influxDb.write(batchPoints);
|
||||
List<List<T>> partition = ListUtils.partition(data, idxLimit);
|
||||
//根据表名找到mapper,执行批量插入
|
||||
InfluxDbBaseMapper mapper;
|
||||
try {
|
||||
mapper = (InfluxDbBaseMapper) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX));
|
||||
System.out.println(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX);
|
||||
for (List<T> sliceList : partition) {
|
||||
List<T> sublistAsOriginalListType = new ArrayList<>(sliceList);
|
||||
mapper.insertBatch(sublistAsOriginalListType);
|
||||
}
|
||||
});
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
//
|
||||
//
|
||||
// partition.forEach(temp -> {
|
||||
// if (!data.isEmpty()) {
|
||||
// Object firstObj = data.get(0);
|
||||
// Class<?> domainClass = firstObj.getClass();
|
||||
// List<Point> pointList = new ArrayList<>();
|
||||
// for (Object o : data) {
|
||||
// Point point = Point
|
||||
// .measurementByPOJO(domainClass)
|
||||
// .addFieldsFromPOJO(o)
|
||||
// .build();
|
||||
// pointList.add(point);
|
||||
// }
|
||||
// //获取数据库名和rp
|
||||
// Measurement measurement = firstObj.getClass().getAnnotation(Measurement.class);
|
||||
// String retentionPolicy = measurement.retentionPolicy();
|
||||
// BatchPoints batchPoints = BatchPoints
|
||||
// .builder()
|
||||
// .points(pointList)
|
||||
// .retentionPolicy(retentionPolicy).build();
|
||||
// influxDb.setDatabase(database);
|
||||
// influxDb.write(batchPoints);
|
||||
// }
|
||||
// });
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -46,10 +46,12 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
private final static String PACKAGE_SUFFIX = "ServiceImpl";
|
||||
|
||||
private final JobDetailInfluxDBService jobDetailInfluxDBService;
|
||||
|
||||
private final InfluxDBBaseServiceImpl influxDBBaseService;
|
||||
|
||||
@Value("${business.slice:2}")
|
||||
private int slice;
|
||||
|
||||
@Override
|
||||
@Async
|
||||
public void dataBacthSysc(DataAsynParam dataAsynParam) {
|
||||
@@ -96,39 +98,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
jobDetailInfluxDB.setTableName(tableName);
|
||||
jobDetailInfluxDB.setExcuteDate(date);
|
||||
jobDetailInfluxDB = jobDetailInfluxDBService.select(jobDetailInfluxDB);
|
||||
if (Objects.nonNull(jobDetailInfluxDB)) {
|
||||
if (jobDetailInfluxDB.getState() == 2 && i == 0 && size != 0) {
|
||||
//第一片执行时返现是失败的,则再次执行
|
||||
jobDetailInfluxDB.setState(0);
|
||||
jobDetailInfluxDB.setRowCount(size);
|
||||
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
|
||||
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
|
||||
} else if (jobDetailInfluxDB.getState() == 0 && i > 0 && jobDetailInfluxDB.getRowCount() + size != 0) {
|
||||
// 处理中,后续时间片的处理,累加记录数
|
||||
jobDetailInfluxDB.setState(0);
|
||||
jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size);
|
||||
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
|
||||
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
|
||||
} else {
|
||||
System.gc();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (size > 0) {
|
||||
jobDetailInfluxDB = new JobDetailInfluxDB();
|
||||
jobDetailInfluxDB.setTableName(tableName);
|
||||
jobDetailInfluxDB.setExcuteDate(date);
|
||||
jobDetailInfluxDB.setState(0);
|
||||
jobDetailInfluxDB.setRowCount(size);
|
||||
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
|
||||
jobDetailInfluxDBService.save(jobDetailInfluxDB);
|
||||
}
|
||||
}
|
||||
try {
|
||||
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
|
||||
//执行目标库的数据处理
|
||||
Class<?> clazz = null;
|
||||
Class<?> clazz2 = null;
|
||||
Class<?> clazz;
|
||||
Class<?> clazz2;
|
||||
//获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
|
||||
try {
|
||||
clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName);
|
||||
@@ -138,28 +112,55 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
}
|
||||
Method method = null;
|
||||
try {
|
||||
method = clazz.getDeclaredMethod("oralceToInfluxDB",clazz2);
|
||||
method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
method.setAccessible(true);
|
||||
Method finalMethod = method;
|
||||
List list1 =(List) weakReferenceData.get().stream().flatMap(po -> {
|
||||
List list1 = (List) weakReferenceData.get().stream().flatMap(po -> {
|
||||
try {
|
||||
|
||||
Object invoke = finalMethod.invoke(null,po);
|
||||
Object invoke = finalMethod.invoke(null, po);
|
||||
Object invoke1 = invoke;
|
||||
//返回oralce转inflaux,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化
|
||||
//返回oracle转influx,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化
|
||||
return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}).map(item-> (Object) item).collect(Collectors.toList());
|
||||
}).collect(Collectors.toList());
|
||||
//插入influxdb
|
||||
influxDBBaseService.insertBatchBySlice(list1,5000);
|
||||
influxDBBaseService.insertBatchBySlice(tableName, list1, 10000);
|
||||
size = list1.size();
|
||||
//最后一片时修改状态
|
||||
}
|
||||
if (Objects.nonNull(jobDetailInfluxDB)) {
|
||||
if (jobDetailInfluxDB.getState() == 2 && i == 0 && size != 0) {
|
||||
//第一片执行时返现是失败的,则再次执行
|
||||
jobDetailInfluxDB.setState(0);
|
||||
jobDetailInfluxDB.setRowCount(size);
|
||||
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
|
||||
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
|
||||
} else if (jobDetailInfluxDB.getState() == 0 && i > 0 && jobDetailInfluxDB.getRowCount() + size != 0) {
|
||||
// 处理中,后续时间片的处理,累加记录数
|
||||
jobDetailInfluxDB.setState(0);
|
||||
jobDetailInfluxDB.setRowCount(jobDetailInfluxDB.getRowCount() + size);
|
||||
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
|
||||
jobDetailInfluxDBService.updateByMultiId(jobDetailInfluxDB);
|
||||
} else {
|
||||
System.gc();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (size > 0) {
|
||||
jobDetailInfluxDB = new JobDetailInfluxDB();
|
||||
jobDetailInfluxDB.setTableName(tableName);
|
||||
jobDetailInfluxDB.setExcuteDate(date);
|
||||
jobDetailInfluxDB.setState(0);
|
||||
jobDetailInfluxDB.setRowCount(size);
|
||||
jobDetailInfluxDB.setUpdateTime(LocalDateTime.now());
|
||||
jobDetailInfluxDBService.save(jobDetailInfluxDB);
|
||||
}
|
||||
}
|
||||
if (i + 1 == slice && Objects.nonNull(jobDetailInfluxDB)) {
|
||||
stopWatch.stop();
|
||||
jobDetailInfluxDB.setState(1);
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
|
||||
/**
|
||||
@@ -16,11 +17,12 @@ import org.springframework.scheduling.annotation.EnableAsync;
|
||||
*/
|
||||
@Slf4j
|
||||
@EnableAsync
|
||||
@DependsOn("proxyMapperRegister")
|
||||
@MapperScan("com.njcn.**.mapper")
|
||||
@SpringBootApplication(scanBasePackages = "com.njcn",exclude = {SecurityAutoConfiguration.class, SecurityFilterAutoConfiguration.class})
|
||||
@SpringBootApplication(scanBasePackages = "com.njcn", exclude = {SecurityAutoConfiguration.class, SecurityFilterAutoConfiguration.class})
|
||||
public class InfluxDataApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(InfluxDataApplication.class, args);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,10 +7,10 @@ server:
|
||||
spring:
|
||||
#influxDB内容配置
|
||||
influx:
|
||||
url: http://192.168.1.24:8086
|
||||
url: http://192.168.1.81:18086
|
||||
user: admin
|
||||
password: 123456
|
||||
database: test
|
||||
database: pqsbase
|
||||
mapper-location: com.njcn.influx.imapper
|
||||
application:
|
||||
name: oracle-data
|
||||
@@ -68,8 +68,8 @@ spring:
|
||||
datasource:
|
||||
master:
|
||||
url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase
|
||||
username: pqsadmin
|
||||
password: Pqsadmin123
|
||||
username: pqsadmin_hn
|
||||
password: pqsadmin
|
||||
driver-class-name: oracle.jdbc.driver.OracleDriver
|
||||
# target:
|
||||
# url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase
|
||||
|
||||
@@ -56,9 +56,9 @@ spring:
|
||||
strict: false
|
||||
datasource:
|
||||
master:
|
||||
url: jdbc:oracle:thin:@192.168.1.101:1521:pqsbase
|
||||
username: pqsadmin
|
||||
password: Pqsadmin123
|
||||
url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase
|
||||
username: pqsadmin_hn
|
||||
password: pqsadmin
|
||||
driver-class-name: oracle.jdbc.driver.OracleDriver
|
||||
target:
|
||||
url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase
|
||||
|
||||
Reference in New Issue
Block a user