完成责任量化功能

This commit is contained in:
2023-07-26 11:20:12 +08:00
parent ae00d7671d
commit c3f8592160
104 changed files with 4709 additions and 1670 deletions

View File

@@ -1,6 +1,9 @@
package com.njcn.influx.imapper;
import com.njcn.influx.ano.Param;
import com.njcn.influx.ano.Select;
import com.njcn.influx.base.InfluxDbBaseMapper;
import com.njcn.influx.pojo.bo.HarmonicHistoryData;
import com.njcn.influx.pojo.dto.StatisticalDataDTO;
import com.njcn.influx.pojo.po.PowerQualityData;
import com.njcn.influx.query.InfluxQueryWrapper;
@@ -17,8 +20,6 @@ import java.util.List;
public interface CommonMapper extends InfluxDbBaseMapper<PowerQualityData> {
List<StatisticalDataDTO> getStatistical(InfluxQueryWrapper influxQueryWrapper);
StatisticalDataDTO getLineRtData(InfluxQueryWrapper influxQueryWrapper);
List<StatisticalDataDTO> getDeviceRtData(InfluxQueryWrapper influxQueryWrapper);
@@ -26,4 +27,13 @@ public interface CommonMapper extends InfluxDbBaseMapper<PowerQualityData> {
List<StatisticalDataDTO> getDeviceRtDataByTime(InfluxQueryWrapper influxQueryWrapper);
StatisticalDataDTO getLineHistoryData(InfluxQueryWrapper influxQueryWrapper);
@Select(value = "#{sql}",resultType = StatisticalDataDTO.class)
StatisticalDataDTO selectBySql(@Param("sql") StringBuilder sql);
@Select(value = "#{sql}",resultType = StatisticalDataDTO.class)
List<?> selectLimitTargetBySql(@Param("sql")String sql);
@Select(value = "#{sql}",resultType = HarmonicHistoryData.class)
List<HarmonicHistoryData> getHistoryResult(@Param("sql")String sql);
}

View File

@@ -0,0 +1,20 @@
package com.njcn.influx.imapper;
import com.njcn.influx.base.InfluxDbBaseMapper;
import com.njcn.influx.pojo.dto.HarmData;
import com.njcn.influx.pojo.po.DataHarmRateV;
import com.njcn.influx.query.InfluxQueryWrapper;
import java.util.List;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月17日 11:03
*/
public interface DataHarmRateVMapper extends InfluxDbBaseMapper<DataHarmRateV> {
DataHarmRateV getMeanAllTimesData(InfluxQueryWrapper influxQueryWrapper);
List<HarmData> getHarmRateVHistoryData(InfluxQueryWrapper influxQueryWrapper);
}

View File

@@ -0,0 +1,20 @@
package com.njcn.influx.imapper;
import com.njcn.influx.base.InfluxDbBaseMapper;
import com.njcn.influx.pojo.dto.HarmData;
import com.njcn.influx.pojo.po.DataI;
import com.njcn.influx.query.InfluxQueryWrapper;
import java.util.List;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月17日 11:05
*/
public interface IDataIMapper extends InfluxDbBaseMapper<DataI> {
DataI getMeanAllTimesData(InfluxQueryWrapper influxQueryWrapper);
List<HarmData> getIHistoryData(InfluxQueryWrapper influxQueryWrapper);
}

View File

@@ -0,0 +1,31 @@
package com.njcn.influx.pojo.bo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.njcn.common.utils.serializer.InstantDateSerializer;
import lombok.Data;
import org.influxdb.annotation.Column;
import java.io.Serializable;
import java.time.Instant;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月17日 16:25
*/
@Data
public class HarmonicHistoryData implements Serializable {
@JsonSerialize(using = InstantDateSerializer.class)
private Instant time;
@Column(name = "phasic_type")
private String phasicType;
private Float aValue;
private Float bValue;
private Float cValue;
}

View File

@@ -0,0 +1,154 @@
package com.njcn.influx.pojo.constant;
/**
* 数据表相关常量
* @author hongawen
* @version 1.0.0
* @date 2022年10月14日 14:02
*/
public interface InfluxDBTableConstant {
/**
* 电压波动闪变表
*/
String DATA_FLICKER = "data_flicker";
/**
* 电压波动表
*/
String DATA_FLUC = "data_fluc";
/**
* 谐波电流角度表
*/
String DATA_HARM_PHASIC_I = "data_harmphasic_i";
/**
* 谐波电压角度表
*/
String DATA_HARM_PHASIC_V = "data_harmphasic_v";
/**
* 有功功率表
*/
String DATA_HARM_POWER_P = "data_harmpower_p";
/**
* 无功功率表
*/
String DATA_HARM_POWER_Q = "data_harmpower_q";
/**
* 视在功率表
*/
String DATA_HARM_POWER_S = "data_harmpower_s";
/**
* 谐波电流含有率表
*/
String DATA_HARM_RATE_I = "data_harmrate_i";
/**
* 谐波电压含有率表
*/
String DATA_HARM_RATE_V = "data_harmrate_v";
/**
* 电流表
*/
String DATA_I = "data_i";
/**
* 电流间谐波幅值表
*/
String DATA_IN_HARM_I = "data_inharm_i";
/**
* 电压间谐波幅值表
*/
String DATA_IN_HARM_V = "data_inharm_v";
/**
* 长时闪变表
*/
String DATA_PLT = "data_plt";
/**
* 电压表
*/
String DATA_V = "data_v";
/**
* 时间
*/
String TIME = "time";
/**
* 数据线路号
*/
String LINE_ID = "line_id";
/**
* 装置ID
*/
String DEV_ID = "dev_id";
/**
* 数据类型
*/
String PHASIC_TYPE = "phasic_type";
/**
* 指标类型
*/
String VALUE_TYPE = "value_type";
/**
* 统计结果
*/
String VALUE = "value_type";
/**
* 统计结果最大值
*/
String MAX_VALUE = "maxValue";
/**
* 统计结果最小值
*/
String MIN_VALUE = "minValue";
/**
* 数据质量标志
*/
String QUALITY_FLAG = "quality_flag";
String CP95 = "CP95";
/**
* 相别
*/
String NO_PHASE = "M";
String PHASE_TYPE_A = "A";
String PHASE_TYPE_AB = "AB";
String PHASE_TYPE_B = "B";
String PHASE_TYPE_BC = "BC";
String PHASE_TYPE_C = "C";
String PHASE_TYPE_C0 = "C0";
String PHASE_TYPE_C1 = "C1";
String PHASE_TYPE_C2 = "C2";
String PHASE_TYPE_CA = "CA";
String PHASE_TYPE_T = "T";
/**
* 日起始时间
*/
String START_TIME = " 00:00:00";
/**
* 日结束时间
*/
String END_TIME = " 23:59:59";
}

View File

@@ -0,0 +1,32 @@
package com.njcn.influx.pojo.dto;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.njcn.common.utils.serializer.InstantDateDeserializer;
import com.njcn.common.utils.serializer.InstantDateSerializer;
import com.njcn.influx.ano.IgnoreData;
import lombok.Data;
import org.influxdb.annotation.Column;
import java.io.Serializable;
import java.time.Instant;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月19日 09:43
*/
@Data
public class HarmData implements Serializable{
@Column(name = "time")
@JsonSerialize(using = InstantDateSerializer.class)
@JsonDeserialize(using = InstantDateDeserializer.class)
private Instant time;
@IgnoreData(value = true)
private Float value;
}

View File

@@ -0,0 +1,24 @@
package com.njcn.influx.pojo.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月19日 09:40
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HarmHistoryDataDTO implements Serializable {
private List<HarmData> historyData = new ArrayList<>();
private float overLimit;
}

View File

@@ -12,54 +12,60 @@ import java.util.List;
* @version V1.0.0
*/
public interface CommonService {
List<StatisticalDataDTO> commonquery(String lineId, String tableName, String columnName);
/**
* 根据条件获取监测点数据
* @param lineId 监测点Id
* @param tableName 表名
* @param columnName 字段
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
StatisticalDataDTO getLineRtData(String lineId, String tableName, String columnName, String phasic, String dataType);
/**
* 根据条件获取监测点数据
*
* @param lineId 监测点Id
* @param tableName
* @param columnName 字段名
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
StatisticalDataDTO getLineRtData(String lineId, String tableName, String columnName, String phasic, String dataType);
/**
* 根据条件获取监测点数据
* @param lineIds 监测点Id
* @param tableName 表名
* @param columnName 字段
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
List<StatisticalDataDTO> getDeviceRtData(List<String> lineIds, String tableName, String columnName, String phasic, String dataType);
/**
* @Description: getDeviceRtDataByTime
* @param lineIds 监测点Id
* @param tableName 表名
* @param columnName 字段名
* @param phasic 相别
* @param dataType 数据类型
*@param startTime start time
* @param endTime end time
* @return: java.util.List<com.njcn.influx.pojo.dto.StatisticalDataDTO>
* @Author: clam
* @Date: 2023/6/13
*/
List<StatisticalDataDTO> getDeviceRtDataByTime(List<String> lineIds, String tableName, String columnName, String phasic, String dataType,String startTime, String endTime);
/**
* 根据条件获取监测点数据
*
* @param lineIds 监测点Id
* @param tableName
* @param columnName 字段名
* @param phasic 相别
* @param dataType 数据类型
*/
List<StatisticalDataDTO> getDeviceRtData(List<String> lineIds, String tableName, String columnName, String phasic, String dataType);
/**
* 根据条件获取监测点时间范围内的最大最小值
* @param lineId 监测点Id
* @param tableName 表
* @param columnName 字段名
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
StatisticalDataDTO getLineHistoryData(String lineId, String tableName, String columnName, String startTime, String endTime);
/**
* @param lineIds 监测点Id
* @param tableName 表名
* @param columnName 字段
* @param phasic 相别
* @param dataType 数据类型
* @param startTime start time
* @param endTime end time
* @Description: getDeviceRtDataByTime
* @return: java.util.List<com.njcn.influx.pojo.dto.StatisticalDataDTO>
* @Author: clam
* @Date: 2023/6/13
*/
List<StatisticalDataDTO> getDeviceRtDataByTime(List<String> lineIds, String tableName, String columnName, String phasic, String dataType, String startTime, String endTime);
/**
* 根据条件获取监测点时间范围内的最大最小值
*
* @param lineId 监测点Id
* @param tableName 表名
* @param columnName 字段名
*/
StatisticalDataDTO getLineHistoryData(String lineId, String tableName, String columnName, String startTime, String endTime);
/***
* 当表名、字段、统计方式均不确定时代码拼接好sql
* @author hongawen
* @date 2023/7/14 15:23
* @param sql influx-sql语句
* @return StatisticalDataDTO
*/
StatisticalDataDTO selectBySql(StringBuilder sql);
}

View File

@@ -0,0 +1,13 @@
package com.njcn.influx.service;
import com.njcn.influx.pojo.po.DataHarmRateV;
import com.njcn.influx.query.InfluxQueryWrapper;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月17日 11:01
*/
public interface DataHarmRateVService {
DataHarmRateV getMeanAllTimesData(InfluxQueryWrapper influxQueryWrapper);
}

View File

@@ -0,0 +1,14 @@
package com.njcn.influx.service;
import com.njcn.influx.pojo.po.DataI;
import com.njcn.influx.query.InfluxQueryWrapper;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月17日 11:04
*/
public interface IDataIService {
DataI getMeanAllTimesData(InfluxQueryWrapper influxQueryWrapper);
}

View File

@@ -1,15 +1,8 @@
package com.njcn.influx.service.impl;
import com.njcn.common.pojo.constant.PatternRegex;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.common.utils.PubUtils;
import com.njcn.influx.ano.Select;
import com.njcn.influx.imapper.CommonMapper;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.pojo.dto.StatisticalDataDTO;
import com.njcn.influx.pojo.po.HarmonicRatioData;
import com.njcn.influx.query.InfluxQueryWrapper;
import com.njcn.influx.service.CommonService;
import com.njcn.influx.utils.ReflectUitl;
@@ -29,23 +22,9 @@ import java.util.List;
@Service
@RequiredArgsConstructor
public class CommonServiceImpl implements CommonService {
private final CommonMapper commonMapper;
@Override
public List<StatisticalDataDTO> commonquery(String lineId ,String tableName, String columnName) {
HashMap<String, Class<?>> entityClassesByAnnotation = ReflectUitl.getEntityClassesByAnnotation();
Class<?> aClass = entityClassesByAnnotation.get(tableName);
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(aClass,StatisticalDataDTO.class);
influxQueryWrapper.eq("LineId",lineId).
last(columnName,"statisticalData").
groupBy(StatisticalDataDTO::getLineId, StatisticalDataDTO::getValueType, StatisticalDataDTO::getPhaseType);
List<StatisticalDataDTO> statistical = commonMapper.getStatistical(influxQueryWrapper);
return statistical;
}
@Override
public StatisticalDataDTO getLineRtData(String lineId, String tableName, String columnName, String phasic, String dataType) {
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(tableName,StatisticalDataDTO.class);
@@ -53,9 +32,9 @@ public class CommonServiceImpl implements CommonService {
.select(StatisticalDataDTO::getPhaseType)
.select(StatisticalDataDTO::getValueType)
.last(columnName)
.eq("line_id",lineId)
.eq("phasic_type",phasic)
.eq("value_type",dataType);
.eq(InfluxDBTableConstant.LINE_ID,lineId)
.eq(InfluxDBTableConstant.PHASIC_TYPE,phasic)
.eq(InfluxDBTableConstant.VALUE_TYPE,dataType);
return commonMapper.getLineRtData(influxQueryWrapper);
}
@@ -66,9 +45,9 @@ public class CommonServiceImpl implements CommonService {
.select(StatisticalDataDTO::getPhaseType)
.select(StatisticalDataDTO::getValueType)
.last(columnName)
.or("line_id",lineIds)
.eq("phasic_type",phasic)
.eq("value_type",dataType).groupBy("line_id");
.or(InfluxDBTableConstant.LINE_ID,lineIds)
.eq(InfluxDBTableConstant.PHASIC_TYPE,phasic)
.eq(InfluxDBTableConstant.VALUE_TYPE,dataType).groupBy(InfluxDBTableConstant.LINE_ID);
return commonMapper.getDeviceRtData(influxQueryWrapper);
}
@@ -78,21 +57,26 @@ public class CommonServiceImpl implements CommonService {
influxQueryWrapper.select(StatisticalDataDTO::getLineId)
.select(StatisticalDataDTO::getPhaseType)
.select(StatisticalDataDTO::getValueType)
.select(columnName,"value")
.or("line_id",lineIds)
.eq("phasic_type",phasic)
.between("time", startTime, endTime)
.eq("value_type",dataType);
.select(columnName,InfluxDBTableConstant.VALUE)
.or(InfluxDBTableConstant.LINE_ID,lineIds)
.eq(InfluxDBTableConstant.PHASIC_TYPE,phasic)
.between(InfluxDBTableConstant.TIME, startTime, endTime)
.eq(InfluxDBTableConstant.VALUE_TYPE,dataType);
return commonMapper.getDeviceRtDataByTime(influxQueryWrapper);
}
@Override
public StatisticalDataDTO getLineHistoryData(String lineId, String tableName, String columnName, String startTime, String endTime) {
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(tableName,StatisticalDataDTO.class);
influxQueryWrapper.max(columnName,"maxValue")
.min(columnName,"minValue")
.eq("line_id",lineId)
.between("time", startTime, endTime);
influxQueryWrapper.max(columnName,InfluxDBTableConstant.MAX_VALUE)
.min(columnName,InfluxDBTableConstant.MIN_VALUE)
.eq(InfluxDBTableConstant.LINE_ID,lineId)
.between(InfluxDBTableConstant.TIME, startTime, endTime);
return commonMapper.getLineHistoryData(influxQueryWrapper);
}
@Override
public StatisticalDataDTO selectBySql(StringBuilder sql) {
return commonMapper.selectBySql(sql);
}
}

View File

@@ -7,9 +7,6 @@ import com.njcn.influx.pojo.dto.DataFlickerDTO;
import com.njcn.influx.pojo.po.DataFlicker;
import com.njcn.influx.query.InfluxQueryWrapper;
import com.njcn.influx.service.DataFlickerService;
import com.njcn.influx.utils.InfluxDbUtil;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@@ -30,49 +27,6 @@ public class DataFlickerServiceImpl implements DataFlickerService {
@Override
public List<DataFlickerDTO> getDataFlicker(String lineIndex, String startTime, String endTime) {
InfluxDbUtil influxDbUtils = new InfluxDbUtil("admin", "123456", "http://192.168.1.16:8086", "pqsbase_sjzx", "autogen");
List<DataFlickerDTO> result = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("line_id='").append(lineIndex).append("' and ");
//最小值
String sql1 = "select min(fluc) AS fluc,min(plt) AS plt,min(pst) AS pst from data_flicker where " + stringBuilder + " time >= '" + startTime + "' and time <= '" + endTime + "' group by line_id,phasic_type,quality_flag tz('Asia/Shanghai')";
QueryResult sqlResult1 = influxDbUtils.query(sql1);
InfluxDBResultMapper resultMapper1 = new InfluxDBResultMapper();
List<DataFlickerDTO> list1 = resultMapper1.toPOJO(sqlResult1, DataFlickerDTO.class);
list1.forEach(item -> {
item.setValueType("MIN");
});
//最大值
String sql2 = "select max(fluc) AS fluc,max(plt) AS plt,max(pst) AS pst from data_flicker where " + stringBuilder + " time >= '" + startTime + "' and time <= '" + endTime + "' group by line_id,phasic_type,quality_flag tz('Asia/Shanghai')";
QueryResult sqlResult2 = influxDbUtils.query(sql2);
InfluxDBResultMapper resultMapper2 = new InfluxDBResultMapper();
List<DataFlickerDTO> list2 = resultMapper2.toPOJO(sqlResult2, DataFlickerDTO.class);
list2.forEach(item -> {
item.setValueType("MAX");
});
//平均值
String sql3 = "select mean(fluc) AS fluc,mean(plt) AS plt,mean(pst) AS pst from data_flicker where " + stringBuilder + " time >= '" + startTime + "' and time <= '" + endTime + "' group by line_id,phasic_type,quality_flag tz('Asia/Shanghai')";
QueryResult sqlResult3 = influxDbUtils.query(sql3);
InfluxDBResultMapper resultMapper3 = new InfluxDBResultMapper();
List<DataFlickerDTO> list3 = resultMapper3.toPOJO(sqlResult3, DataFlickerDTO.class);
list3.forEach(item -> {
item.setValueType("AVG");
});
//CP95值
String sql4 = "select percentile(fluc,95) AS fluc,percentile(plt,95) AS plt,percentile(pst,95) AS pst from data_flicker where " + stringBuilder + " time >= '" + startTime + "' and time <= '" + endTime + "' group by line_id,phasic_type,quality_flag tz('Asia/Shanghai')";
QueryResult sqlResult4 = influxDbUtils.query(sql4);
InfluxDBResultMapper resultMapper4 = new InfluxDBResultMapper();
List<DataFlickerDTO> list4 = resultMapper4.toPOJO(sqlResult4, DataFlickerDTO.class);
list4.forEach(item -> {
item.setValueType("CP95");
});
result.addAll(list1);
result.addAll(list2);
result.addAll(list3);
result.addAll(list4);
/////改造前↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑改造后↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//最小值
List<DataFlickerDTO> result1 ;
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataFlicker.class, DataFlickerDTO.class);
@@ -125,16 +79,14 @@ public class DataFlickerServiceImpl implements DataFlickerService {
result4.forEach(item -> {
item.setValueType("CP95");
});
List<DataFlickerDTO> result5 = new ArrayList<>();
result5.addAll(result1);
result5.addAll(result2);
result5.addAll(result3);
result5.addAll(result4);
List<DataFlickerDTO> result = new ArrayList<>();
result.addAll(result1);
result.addAll(result2);
result.addAll(result3);
result.addAll(result4);
ObjectMapper objectMapper = new ObjectMapper();
try {
System.out.println(objectMapper.writeValueAsString(result));
System.out.println(objectMapper.writeValueAsString(result5));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

View File

@@ -0,0 +1,26 @@
package com.njcn.influx.service.impl;
import com.njcn.influx.imapper.DataHarmRateVMapper;
import com.njcn.influx.pojo.po.DataHarmRateV;
import com.njcn.influx.query.InfluxQueryWrapper;
import com.njcn.influx.service.DataHarmRateVService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月17日 11:02
*/
@Service
@RequiredArgsConstructor
public class DataHarmRateVServiceImpl implements DataHarmRateVService {
private final DataHarmRateVMapper dataHarmRateVMapper;
@Override
public DataHarmRateV getMeanAllTimesData(InfluxQueryWrapper influxQueryWrapper) {
return dataHarmRateVMapper.getMeanAllTimesData(influxQueryWrapper);
}
}

View File

@@ -0,0 +1,25 @@
package com.njcn.influx.service.impl;
import com.njcn.influx.imapper.IDataIMapper;
import com.njcn.influx.pojo.po.DataI;
import com.njcn.influx.query.InfluxQueryWrapper;
import com.njcn.influx.service.IDataIService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月17日 11:05
*/
@Service
@RequiredArgsConstructor
public class IDataIServiceImpl implements IDataIService {
private final IDataIMapper dataIMapper;
@Override
public DataI getMeanAllTimesData(InfluxQueryWrapper influxQueryWrapper) {
return dataIMapper.getMeanAllTimesData(influxQueryWrapper);
}
}

View File

@@ -1,376 +0,0 @@
package com.njcn.influx.utils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;
import org.influxdb.dto.Point.Builder;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2021/11/16 10:20
*/
@Slf4j
@Data
public class InfluxDbUtil {
static OkHttpClient.Builder client = new OkHttpClient.Builder()
.connectTimeout(1000,TimeUnit.SECONDS)
.readTimeout(1000, TimeUnit.SECONDS)
.writeTimeout(1000, TimeUnit.SECONDS);
private static final int FRACTION_MIN_WIDTH = 0;
private static final int FRACTION_MAX_WIDTH = 9;
private static final boolean ADD_DECIMAL_POINT = true;
private static final DateTimeFormatter RFC3339_FORMATTER = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd'T'HH:mm:ss")
.appendFraction(ChronoField.NANO_OF_SECOND, FRACTION_MIN_WIDTH, FRACTION_MAX_WIDTH, ADD_DECIMAL_POINT)
.appendZoneOrOffsetId()
.toFormatter();
/**用户名*/
private String username;
/**密码*/
private String password;
/**链接地址*/
private String openurl;
/**数据库*/
private String dbName;
/**保留策略*/
private String retentionPolicy;
private InfluxDB influxDB;
public InfluxDbUtil(String username, String password, String url, String dbName, String retentionPolicy) {
this.username = username;
this.password = password;
this.openurl = url;
this.dbName = dbName;
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
influxDbBuild();
}
/**
* 连接时序数据库 ,若不存在则创建
*
* @return
*/
public InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(openurl, username, password,client);
}
try {
// if (!influxDB.databaseExists(database)) {
// influxDB.createDatabase(database);
// }
} catch (Exception e) {
// 该数据库可能设置动态代理,不支持创建数据库
// e.printStackTrace();
} finally {
influxDB.setRetentionPolicy(retentionPolicy);
}
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
return influxDB;
}
/**
* 创建数据库
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void createDB(String dbName) {
influxDB.createDatabase(dbName);
}
/**
* 删除数据库
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void deleteDB(String dbName) {
influxDB.deleteDatabase(dbName);
}
/**
* 测试连接是否正常
*
* @return true 正常
*/
public boolean ping() {
boolean isConnected = false;
Pong pong;
try {
pong = influxDB.ping();
if (pong != null) {
isConnected = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return isConnected;
}
/**
* 创建自定义保留策略
*
* @param policyName 策略名
* @param days 保存天数
* @param replication 保存副本数量
* @param isDefault 是否设为默认保留策略
*/
public void createRetentionPolicy(String dataBaseName, String policyName, int days, int replication,
Boolean isDefault) {
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName,
dataBaseName, days, replication);
if (isDefault) {
sql = sql + " DEFAULT";
}
query(sql);
}
/**
* 创建默认的保留策略
*
* 策略名hour保存天数30天保存副本数量1,设为默认保留策略
*/
public void createDefaultRetentionPolicy() {
String command = String
.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "hour", dbName,
"30d", 1);
this.query(command);
}
/**
* 查询
*
* @param command 查询语句
* @return
*/
public QueryResult query(String command) {
return influxDB.query(new Query(command, dbName));
}
/**
* 插入
*
* @param measurement 表
* @param tags 标签
* @param fields 字段
*/
public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,
TimeUnit timeUnit) {
Builder builder = Point.measurement(measurement);
builder.tag(tags);
builder.fields(fields);
if (0 != time) {
builder.time(time, timeUnit);
}
influxDB.write(dbName, retentionPolicy, builder.build());
}
/**
* 批量写入测点
*
* @param batchPoints
*/
public void batchInsert(BatchPoints batchPoints, TimeUnit timeUnit) {
influxDB.write(batchPoints);
// influxDB.enableGzip();
// influxDB.enableBatch(2000,100,timeUnit);
// influxDB.disableGzip();
// influxDB.disableBatch();
}
/**
* 批量写入数据
*
* @param database 数据库
* @param retentionPolicy 保存策略
* @param consistency 一致性
* @param records 要保存的数据调用BatchPoints.lineProtocol()可得到一条record
*/
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,TimeUnit timeUnit, final List<String> records) {
influxDB.write(database, retentionPolicy, consistency, records);
}
/**
* 批量写入数据
*
* @param database 数据库
* @param retentionPolicy 保存策略
* @param consistency 一致性
* @param records 要保存的数据调用BatchPoints.lineProtocol()可得到一条record
*/
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List<String> records) {
influxDB.write(database, retentionPolicy, consistency, records);
}
/**
* 删除
*
* @param command 删除语句
* @return 返回错误信息
*/
public String deleteMeasurementData(String command) {
QueryResult result = influxDB.query(new Query(command, dbName));
return result.getError();
}
/**
* 关闭数据库
*/
public void close() {
influxDB.close();
}
/**
* 构建Point
*
* @param measurement
* @param time
* @param fields
* @return
*/
public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags,
Map<String, Object> fields) {
Point point = Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build();
return point;
}
//单条查询语句封装
/**
* 查询结果封装到map
* @param commond 单条sql语句
* @return 查询结果
*/
@Deprecated
public List<Map<String, Object>> getResult(String commond){
List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){
return retList;
}
QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){
return retList;
}
QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){
Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){
if(columns.get(i).equals("time")){
long aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)).getEpochSecond();
map.put(columns.get(i), aa);
}else {
map.put(columns.get(i),columnValue.get(i));
}
}
retList.add(map);
}
return retList;
}
@Deprecated
public List<Map<String, Object>> getResult(String commond, String type){
List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){
return retList;
}
QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){
return retList;
}
QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){
Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){
if(columns.get(i).equals("time")){
Instant aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8));
LocalDateTime localDateTime =LocalDateTime.ofInstant(aa,ZoneId.systemDefault());
String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
map.put(columns.get(i), time);
}else {
map.put(columns.get(i),columnValue.get(i));
}
}
retList.add(map);
}
return retList;
}
public List<Map<String, Object>> getMapResult(String commond){
List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){
return retList;
}
QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){
return retList;
}
QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){
Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){
if(columns.get(i).equals("time")){
Instant instant = Instant.from(RFC3339_FORMATTER.parse(String.valueOf(columnValue.get(i))));
LocalDateTime localDateTime =LocalDateTime.ofInstant(instant,ZoneId.systemDefault());
String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
map.put(columns.get(i), time);
}else {
map.put(columns.get(i),columnValue.get(i));
}
}
retList.add(map);
}
return retList;
}
}