微调
This commit is contained in:
@@ -2,6 +2,8 @@ package com.njcn.influx.config;
|
|||||||
|
|
||||||
import com.njcn.influx.base.ProxyMapperRegister;
|
import com.njcn.influx.base.ProxyMapperRegister;
|
||||||
import com.njcn.influx.core.InfluxExecutor;
|
import com.njcn.influx.core.InfluxExecutor;
|
||||||
|
import com.njcn.influx.utils.InfluxDbUtils;
|
||||||
|
import lombok.Data;
|
||||||
import org.influxdb.InfluxDB;
|
import org.influxdb.InfluxDB;
|
||||||
import org.influxdb.impl.InfluxDBMapper;
|
import org.influxdb.impl.InfluxDBMapper;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
@@ -15,6 +17,7 @@ import org.springframework.core.io.ResourceLoader;
|
|||||||
* @author hongawen
|
* @author hongawen
|
||||||
* @version 1.0.0
|
* @version 1.0.0
|
||||||
*/
|
*/
|
||||||
|
@Data
|
||||||
@Configuration
|
@Configuration
|
||||||
public class InfluxDbConfig {
|
public class InfluxDbConfig {
|
||||||
|
|
||||||
@@ -22,12 +25,21 @@ public class InfluxDbConfig {
|
|||||||
/***
|
/***
|
||||||
* influxMapper存放路径
|
* influxMapper存放路径
|
||||||
*/
|
*/
|
||||||
@Value("${spring.influx.mapper-location}")
|
@Value("${spring.influx.mapper-location:com.njcn.influx.imapper}")
|
||||||
private String mapperLocation;
|
private String mapperLocation;
|
||||||
|
|
||||||
@Value("${spring.influx.database}")
|
@Value("${spring.influx.database}")
|
||||||
private String database;
|
private String database;
|
||||||
|
|
||||||
|
@Value("${spring.influx.url}")
|
||||||
|
private String url;
|
||||||
|
|
||||||
|
@Value("${spring.influx.user}")
|
||||||
|
private String user;
|
||||||
|
|
||||||
|
@Value("${spring.influx.password}")
|
||||||
|
private String password;
|
||||||
|
|
||||||
@Bean(name = "influxDbMapper")
|
@Bean(name = "influxDbMapper")
|
||||||
public InfluxDBMapper influxDbMapper(InfluxDB influxDb) {
|
public InfluxDBMapper influxDbMapper(InfluxDB influxDb) {
|
||||||
influxDb.setLogLevel(InfluxDB.LogLevel.BASIC);
|
influxDb.setLogLevel(InfluxDB.LogLevel.BASIC);
|
||||||
@@ -44,4 +56,10 @@ public class InfluxDbConfig {
|
|||||||
return new ProxyMapperRegister(mapperLocation, applicationContext, resourceLoader);
|
return new ProxyMapperRegister(mapperLocation, applicationContext, resourceLoader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public InfluxDbUtils influxDbUtils() {
|
||||||
|
return new InfluxDbUtils(user, password, url, database, "autogen");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.njcn.influx.core;
|
package com.njcn.influx.core;
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollectionUtil;
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
import cn.hutool.core.text.StrPool;
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import com.njcn.influx.ano.IgnoreData;
|
import com.njcn.influx.ano.IgnoreData;
|
||||||
import com.njcn.influx.constant.InfluxDbSqlConstant;
|
import com.njcn.influx.constant.InfluxDbSqlConstant;
|
||||||
@@ -10,7 +9,6 @@ import com.njcn.influx.utils.InstantUtil;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import org.influxdb.InfluxDB;
|
import org.influxdb.InfluxDB;
|
||||||
import org.influxdb.annotation.Column;
|
|
||||||
import org.influxdb.annotation.Measurement;
|
import org.influxdb.annotation.Measurement;
|
||||||
import org.influxdb.dto.BatchPoints;
|
import org.influxdb.dto.BatchPoints;
|
||||||
import org.influxdb.dto.Point;
|
import org.influxdb.dto.Point;
|
||||||
@@ -19,8 +17,6 @@ import org.influxdb.dto.QueryResult;
|
|||||||
import org.influxdb.impl.InfluxDBMapper;
|
import org.influxdb.impl.InfluxDBMapper;
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
|
|||||||
377
src/main/java/com/njcn/influx/utils/InfluxDbUtils.java
Normal file
377
src/main/java/com/njcn/influx/utils/InfluxDbUtils.java
Normal file
@@ -0,0 +1,377 @@
|
|||||||
|
package com.njcn.influx.utils;
|
||||||
|
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import okhttp3.OkHttpClient;
|
||||||
|
import org.influxdb.InfluxDB;
|
||||||
|
import org.influxdb.InfluxDB.ConsistencyLevel;
|
||||||
|
import org.influxdb.InfluxDBFactory;
|
||||||
|
import org.influxdb.dto.*;
|
||||||
|
import org.influxdb.dto.Point.Builder;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.time.format.DateTimeFormatterBuilder;
|
||||||
|
import java.time.temporal.ChronoField;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 类的介绍:
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @createTime 2021/11/16 10:20
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Data
|
||||||
|
public class InfluxDbUtils {
|
||||||
|
static OkHttpClient.Builder client = new OkHttpClient.Builder()
|
||||||
|
.connectTimeout(1000,TimeUnit.SECONDS)
|
||||||
|
.readTimeout(1000, TimeUnit.SECONDS)
|
||||||
|
.writeTimeout(1000, TimeUnit.SECONDS);
|
||||||
|
private static final int FRACTION_MIN_WIDTH = 0;
|
||||||
|
private static final int FRACTION_MAX_WIDTH = 9;
|
||||||
|
private static final boolean ADD_DECIMAL_POINT = true;
|
||||||
|
private static final DateTimeFormatter RFC3339_FORMATTER = new DateTimeFormatterBuilder()
|
||||||
|
.appendPattern("yyyy-MM-dd'T'HH:mm:ss")
|
||||||
|
.appendFraction(ChronoField.NANO_OF_SECOND, FRACTION_MIN_WIDTH, FRACTION_MAX_WIDTH, ADD_DECIMAL_POINT)
|
||||||
|
.appendZoneOrOffsetId()
|
||||||
|
.toFormatter();
|
||||||
|
|
||||||
|
/**用户名*/
|
||||||
|
private String username;
|
||||||
|
/**密码*/
|
||||||
|
private String password;
|
||||||
|
/**链接地址*/
|
||||||
|
private String openurl;
|
||||||
|
/**数据库*/
|
||||||
|
private String dbName;
|
||||||
|
/**保留策略*/
|
||||||
|
private String retentionPolicy;
|
||||||
|
|
||||||
|
private InfluxDB influxDB;
|
||||||
|
|
||||||
|
public InfluxDbUtils(String username, String password, String url, String dbName, String retentionPolicy) {
|
||||||
|
this.username = username;
|
||||||
|
this.password = password;
|
||||||
|
this.openurl = url;
|
||||||
|
this.dbName = dbName;
|
||||||
|
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
|
||||||
|
influxDbBuild();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接时序数据库 ,若不存在则创建
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public InfluxDB influxDbBuild() {
|
||||||
|
if (influxDB == null) {
|
||||||
|
influxDB = InfluxDBFactory.connect(openurl, username, password,client);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// if (!influxDB.databaseExists(database)) {
|
||||||
|
// influxDB.createDatabase(database);
|
||||||
|
// }
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 该数据库可能设置动态代理,不支持创建数据库
|
||||||
|
// e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
influxDB.setRetentionPolicy(retentionPolicy);
|
||||||
|
}
|
||||||
|
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
|
||||||
|
return influxDB;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建数据库
|
||||||
|
*
|
||||||
|
* @param dbName
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public void createDB(String dbName) {
|
||||||
|
influxDB.createDatabase(dbName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除数据库
|
||||||
|
*
|
||||||
|
* @param dbName
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public void deleteDB(String dbName) {
|
||||||
|
influxDB.deleteDatabase(dbName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 测试连接是否正常
|
||||||
|
*
|
||||||
|
* @return true 正常
|
||||||
|
*/
|
||||||
|
public boolean ping() {
|
||||||
|
boolean isConnected = false;
|
||||||
|
Pong pong;
|
||||||
|
try {
|
||||||
|
pong = influxDB.ping();
|
||||||
|
if (pong != null) {
|
||||||
|
isConnected = true;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return isConnected;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建自定义保留策略
|
||||||
|
*
|
||||||
|
* @param policyName 策略名
|
||||||
|
* @param days 保存天数
|
||||||
|
* @param replication 保存副本数量
|
||||||
|
* @param isDefault 是否设为默认保留策略
|
||||||
|
*/
|
||||||
|
public void createRetentionPolicy(String dataBaseName, String policyName, int days, int replication,
|
||||||
|
Boolean isDefault) {
|
||||||
|
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName,
|
||||||
|
dataBaseName, days, replication);
|
||||||
|
if (isDefault) {
|
||||||
|
sql = sql + " DEFAULT";
|
||||||
|
}
|
||||||
|
query(sql);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建默认的保留策略
|
||||||
|
*
|
||||||
|
* 策略名:hour,保存天数:30天,保存副本数量:1,设为默认保留策略
|
||||||
|
*/
|
||||||
|
public void createDefaultRetentionPolicy() {
|
||||||
|
String command = String
|
||||||
|
.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "hour", dbName,
|
||||||
|
"30d", 1);
|
||||||
|
this.query(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询
|
||||||
|
*
|
||||||
|
* @param command 查询语句
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public QueryResult query(String command) {
|
||||||
|
return influxDB.query(new Query(command, dbName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 插入
|
||||||
|
*
|
||||||
|
* @param measurement 表
|
||||||
|
* @param tags 标签
|
||||||
|
* @param fields 字段
|
||||||
|
*/
|
||||||
|
public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,
|
||||||
|
TimeUnit timeUnit) {
|
||||||
|
Builder builder = Point.measurement(measurement);
|
||||||
|
builder.tag(tags);
|
||||||
|
builder.fields(fields);
|
||||||
|
if (0 != time) {
|
||||||
|
builder.time(time, timeUnit);
|
||||||
|
}
|
||||||
|
influxDB.write(dbName, retentionPolicy, builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量写入测点
|
||||||
|
*
|
||||||
|
* @param batchPoints
|
||||||
|
*/
|
||||||
|
public void batchInsert(BatchPoints batchPoints, TimeUnit timeUnit) {
|
||||||
|
influxDB.write(batchPoints);
|
||||||
|
// influxDB.enableGzip();
|
||||||
|
// influxDB.enableBatch(2000,100,timeUnit);
|
||||||
|
// influxDB.disableGzip();
|
||||||
|
// influxDB.disableBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量写入数据
|
||||||
|
*
|
||||||
|
* @param database 数据库
|
||||||
|
* @param retentionPolicy 保存策略
|
||||||
|
* @param consistency 一致性
|
||||||
|
* @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
|
||||||
|
*/
|
||||||
|
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,TimeUnit timeUnit, final List<String> records) {
|
||||||
|
influxDB.write(database, retentionPolicy, consistency, records);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量写入数据
|
||||||
|
*
|
||||||
|
* @param database 数据库
|
||||||
|
* @param retentionPolicy 保存策略
|
||||||
|
* @param consistency 一致性
|
||||||
|
* @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
|
||||||
|
*/
|
||||||
|
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List<String> records) {
|
||||||
|
influxDB.write(database, retentionPolicy, consistency, records);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除
|
||||||
|
*
|
||||||
|
* @param command 删除语句
|
||||||
|
* @return 返回错误信息
|
||||||
|
*/
|
||||||
|
public String deleteMeasurementData(String command) {
|
||||||
|
QueryResult result = influxDB.query(new Query(command, dbName));
|
||||||
|
return result.getError();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭数据库
|
||||||
|
*/
|
||||||
|
public void close() {
|
||||||
|
influxDB.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建Point
|
||||||
|
*
|
||||||
|
* @param measurement
|
||||||
|
* @param time
|
||||||
|
* @param fields
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags,
|
||||||
|
Map<String, Object> fields) {
|
||||||
|
Point point = Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build();
|
||||||
|
return point;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//单条查询语句封装
|
||||||
|
/**
|
||||||
|
* 查询结果封装到map
|
||||||
|
* @param commond 单条sql语句
|
||||||
|
* @return 查询结果
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public List<Map<String, Object>> getResult(String commond){
|
||||||
|
List<Map<String, Object>> retList = new ArrayList<>();
|
||||||
|
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
|
||||||
|
List<QueryResult.Result> results = queryResult.getResults();
|
||||||
|
if (results==null||results.isEmpty()){
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
QueryResult.Result result = results.get(0);
|
||||||
|
List<QueryResult.Series> seriess = result.getSeries();
|
||||||
|
if (seriess==null||seriess.isEmpty()){
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
QueryResult.Series series = seriess.get(0);
|
||||||
|
List<String> columns = series.getColumns();
|
||||||
|
List<List<Object>> values = series.getValues();
|
||||||
|
for (List<Object> columnValue :values){
|
||||||
|
Map<String, Object> map = new HashMap<>(1);
|
||||||
|
for (int i=0;i<columnValue.size();i++){
|
||||||
|
if(columns.get(i).equals("time")){
|
||||||
|
long aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)).getEpochSecond();
|
||||||
|
map.put(columns.get(i), aa);
|
||||||
|
}else {
|
||||||
|
map.put(columns.get(i),columnValue.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
retList.add(map);
|
||||||
|
}
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
public List<Map<String, Object>> getResult(String commond, String type){
|
||||||
|
|
||||||
|
List<Map<String, Object>> retList = new ArrayList<>();
|
||||||
|
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
|
||||||
|
List<QueryResult.Result> results = queryResult.getResults();
|
||||||
|
if (results==null||results.isEmpty()){
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
QueryResult.Result result = results.get(0);
|
||||||
|
List<QueryResult.Series> seriess = result.getSeries();
|
||||||
|
if (seriess==null||seriess.isEmpty()){
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
QueryResult.Series series = seriess.get(0);
|
||||||
|
List<String> columns = series.getColumns();
|
||||||
|
List<List<Object>> values = series.getValues();
|
||||||
|
for (List<Object> columnValue :values){
|
||||||
|
Map<String, Object> map = new HashMap<>(1);
|
||||||
|
for (int i=0;i<columnValue.size();i++){
|
||||||
|
if(columns.get(i).equals("time")){
|
||||||
|
Instant aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8));
|
||||||
|
LocalDateTime localDateTime =LocalDateTime.ofInstant(aa,ZoneId.systemDefault());
|
||||||
|
String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
||||||
|
map.put(columns.get(i), time);
|
||||||
|
}else {
|
||||||
|
map.put(columns.get(i),columnValue.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
retList.add(map);
|
||||||
|
}
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public List<Map<String, Object>> getMapResult(String commond){
|
||||||
|
|
||||||
|
List<Map<String, Object>> retList = new ArrayList<>();
|
||||||
|
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
|
||||||
|
List<QueryResult.Result> results = queryResult.getResults();
|
||||||
|
if (results==null||results.isEmpty()){
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
QueryResult.Result result = results.get(0);
|
||||||
|
List<QueryResult.Series> seriess = result.getSeries();
|
||||||
|
if (seriess==null||seriess.isEmpty()){
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
QueryResult.Series series = seriess.get(0);
|
||||||
|
List<String> columns = series.getColumns();
|
||||||
|
List<List<Object>> values = series.getValues();
|
||||||
|
for (List<Object> columnValue :values){
|
||||||
|
Map<String, Object> map = new HashMap<>(1);
|
||||||
|
for (int i=0;i<columnValue.size();i++){
|
||||||
|
if(columns.get(i).equals("time")){
|
||||||
|
Instant instant = Instant.from(RFC3339_FORMATTER.parse(String.valueOf(columnValue.get(i))));
|
||||||
|
LocalDateTime localDateTime =LocalDateTime.ofInstant(instant,ZoneId.systemDefault());
|
||||||
|
String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
||||||
|
map.put(columns.get(i), time);
|
||||||
|
}else {
|
||||||
|
map.put(columns.get(i),columnValue.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
retList.add(map);
|
||||||
|
}
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user