diff --git a/src/main/java/com/njcn/influx/config/InfluxDbConfig.java b/src/main/java/com/njcn/influx/config/InfluxDbConfig.java index d5c12ca..0192019 100644 --- a/src/main/java/com/njcn/influx/config/InfluxDbConfig.java +++ b/src/main/java/com/njcn/influx/config/InfluxDbConfig.java @@ -2,6 +2,8 @@ package com.njcn.influx.config; import com.njcn.influx.base.ProxyMapperRegister; import com.njcn.influx.core.InfluxExecutor; +import com.njcn.influx.utils.InfluxDbUtils; +import lombok.Data; import org.influxdb.InfluxDB; import org.influxdb.impl.InfluxDBMapper; import org.springframework.beans.factory.annotation.Value; @@ -15,6 +17,7 @@ import org.springframework.core.io.ResourceLoader; * @author hongawen * @version 1.0.0 */ +@Data @Configuration public class InfluxDbConfig { @@ -22,12 +25,21 @@ public class InfluxDbConfig { /*** * influxMapper存放路径 */ - @Value("${spring.influx.mapper-location}") + @Value("${spring.influx.mapper-location:com.njcn.influx.imapper}") private String mapperLocation; @Value("${spring.influx.database}") private String database; + @Value("${spring.influx.url}") + private String url; + + @Value("${spring.influx.user}") + private String user; + + @Value("${spring.influx.password}") + private String password; + @Bean(name = "influxDbMapper") public InfluxDBMapper influxDbMapper(InfluxDB influxDb) { influxDb.setLogLevel(InfluxDB.LogLevel.BASIC); @@ -44,4 +56,10 @@ public class InfluxDbConfig { return new ProxyMapperRegister(mapperLocation, applicationContext, resourceLoader); } + + @Bean + public InfluxDbUtils influxDbUtils() { + return new InfluxDbUtils(user, password, url, database, "autogen"); + } + } diff --git a/src/main/java/com/njcn/influx/core/InfluxExecutor.java b/src/main/java/com/njcn/influx/core/InfluxExecutor.java index 2d0a7e5..934e4f7 100644 --- a/src/main/java/com/njcn/influx/core/InfluxExecutor.java +++ b/src/main/java/com/njcn/influx/core/InfluxExecutor.java @@ -1,7 +1,6 @@ package com.njcn.influx.core; import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.text.StrPool; import cn.hutool.core.util.ObjectUtil; import com.njcn.influx.ano.IgnoreData; import com.njcn.influx.constant.InfluxDbSqlConstant; @@ -10,7 +9,6 @@ import com.njcn.influx.utils.InstantUtil; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.influxdb.InfluxDB; -import org.influxdb.annotation.Column; import org.influxdb.annotation.Measurement; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; @@ -19,8 +17,6 @@ import org.influxdb.dto.QueryResult; import org.influxdb.impl.InfluxDBMapper; import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.math.BigDecimal; import java.time.Instant; import java.util.*; diff --git a/src/main/java/com/njcn/influx/utils/InfluxDbUtils.java b/src/main/java/com/njcn/influx/utils/InfluxDbUtils.java new file mode 100644 index 0000000..3e02395 --- /dev/null +++ b/src/main/java/com/njcn/influx/utils/InfluxDbUtils.java @@ -0,0 +1,377 @@ +package com.njcn.influx.utils; + + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDB.ConsistencyLevel; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.*; +import org.influxdb.dto.Point.Builder; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2021/11/16 10:20 + */ +@Slf4j +@Data +public class InfluxDbUtils { + static OkHttpClient.Builder client = new OkHttpClient.Builder() + .connectTimeout(1000,TimeUnit.SECONDS) + .readTimeout(1000, TimeUnit.SECONDS) + .writeTimeout(1000, TimeUnit.SECONDS); + private static final int FRACTION_MIN_WIDTH = 0; + private static final int FRACTION_MAX_WIDTH = 9; + private static final boolean ADD_DECIMAL_POINT = true; + private static final DateTimeFormatter RFC3339_FORMATTER = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd'T'HH:mm:ss") + .appendFraction(ChronoField.NANO_OF_SECOND, FRACTION_MIN_WIDTH, FRACTION_MAX_WIDTH, ADD_DECIMAL_POINT) + .appendZoneOrOffsetId() + .toFormatter(); + + /**用户名*/ + private String username; + /**密码*/ + private String password; + /**链接地址*/ + private String openurl; + /**数据库*/ + private String dbName; + /**保留策略*/ + private String retentionPolicy; + + private InfluxDB influxDB; + + public InfluxDbUtils(String username, String password, String url, String dbName, String retentionPolicy) { + this.username = username; + this.password = password; + this.openurl = url; + this.dbName = dbName; + this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy; + influxDbBuild(); + } + + + /** + * 连接时序数据库 ,若不存在则创建 + * + * @return + */ + public InfluxDB influxDbBuild() { + if (influxDB == null) { + influxDB = InfluxDBFactory.connect(openurl, username, password,client); + } + try { + // if (!influxDB.databaseExists(database)) { + // influxDB.createDatabase(database); + // } + } catch (Exception e) { + // 该数据库可能设置动态代理,不支持创建数据库 + // e.printStackTrace(); + } finally { + influxDB.setRetentionPolicy(retentionPolicy); + } + influxDB.setLogLevel(InfluxDB.LogLevel.NONE); + return influxDB; + } + + + + + /** + * 创建数据库 + * + * @param dbName + */ + @SuppressWarnings("deprecation") + public void createDB(String dbName) { + influxDB.createDatabase(dbName); + } + + /** + * 删除数据库 + * + * @param dbName + */ + @SuppressWarnings("deprecation") + public void deleteDB(String dbName) { + influxDB.deleteDatabase(dbName); + } + + /** + * 测试连接是否正常 + * + * @return true 正常 + */ + public boolean ping() { + boolean isConnected = false; + Pong pong; + try { + pong = influxDB.ping(); + if (pong != null) { + isConnected = true; + } + } catch (Exception e) { + e.printStackTrace(); + } + return isConnected; + } + + /** + * 创建自定义保留策略 + * + * @param policyName 策略名 + * @param days 保存天数 + * @param replication 保存副本数量 + * @param isDefault 是否设为默认保留策略 + */ + public void createRetentionPolicy(String dataBaseName, String policyName, int days, int replication, + Boolean isDefault) { + String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName, + dataBaseName, days, replication); + if (isDefault) { + sql = sql + " DEFAULT"; + } + query(sql); + } + + /** + * 创建默认的保留策略 + * + * 策略名:hour,保存天数:30天,保存副本数量:1,设为默认保留策略 + */ + public void createDefaultRetentionPolicy() { + String command = String + .format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "hour", dbName, + "30d", 1); + this.query(command); + } + + /** + * 查询 + * + * @param command 查询语句 + * @return + */ + public QueryResult query(String command) { + return influxDB.query(new Query(command, dbName)); + } + + /** + * 插入 + * + * @param measurement 表 + * @param tags 标签 + * @param fields 字段 + */ + public void insert(String measurement, Map tags, Map fields, long time, + TimeUnit timeUnit) { + Builder builder = Point.measurement(measurement); + builder.tag(tags); + builder.fields(fields); + if (0 != time) { + builder.time(time, timeUnit); + } + influxDB.write(dbName, retentionPolicy, builder.build()); + } + + + + /** + * 批量写入测点 + * + * @param batchPoints + */ + public void batchInsert(BatchPoints batchPoints, TimeUnit timeUnit) { + influxDB.write(batchPoints); + // influxDB.enableGzip(); + // influxDB.enableBatch(2000,100,timeUnit); + // influxDB.disableGzip(); + // influxDB.disableBatch(); + } + + /** + * 批量写入数据 + * + * @param database 数据库 + * @param retentionPolicy 保存策略 + * @param consistency 一致性 + * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) + */ + public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,TimeUnit timeUnit, final List records) { + influxDB.write(database, retentionPolicy, consistency, records); + } + + /** + * 批量写入数据 + * + * @param database 数据库 + * @param retentionPolicy 保存策略 + * @param consistency 一致性 + * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) + */ + public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List records) { + influxDB.write(database, retentionPolicy, consistency, records); + } + + + /** + * 删除 + * + * @param command 删除语句 + * @return 返回错误信息 + */ + public String deleteMeasurementData(String command) { + QueryResult result = influxDB.query(new Query(command, dbName)); + return result.getError(); + } + + /** + * 关闭数据库 + */ + public void close() { + influxDB.close(); + } + + /** + * 构建Point + * + * @param measurement + * @param time + * @param fields + * @return + */ + public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map tags, + Map fields) { + Point point = Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build(); + return point; + } + + + + + + //单条查询语句封装 + /** + * 查询结果封装到map + * @param commond 单条sql语句 + * @return 查询结果 + */ + @Deprecated + public List> getResult(String commond){ + List> retList = new ArrayList<>(); + QueryResult queryResult = influxDB.query(new Query(commond,dbName)); + List results = queryResult.getResults(); + if (results==null||results.isEmpty()){ + return retList; + } + QueryResult.Result result = results.get(0); + List seriess = result.getSeries(); + if (seriess==null||seriess.isEmpty()){ + return retList; + } + QueryResult.Series series = seriess.get(0); + List columns = series.getColumns(); + List> values = series.getValues(); + for (List columnValue :values){ + Map map = new HashMap<>(1); + for (int i=0;i> getResult(String commond, String type){ + + List> retList = new ArrayList<>(); + QueryResult queryResult = influxDB.query(new Query(commond,dbName)); + List results = queryResult.getResults(); + if (results==null||results.isEmpty()){ + return retList; + } + QueryResult.Result result = results.get(0); + List seriess = result.getSeries(); + if (seriess==null||seriess.isEmpty()){ + return retList; + } + QueryResult.Series series = seriess.get(0); + List columns = series.getColumns(); + List> values = series.getValues(); + for (List columnValue :values){ + Map map = new HashMap<>(1); + for (int i=0;i> getMapResult(String commond){ + + List> retList = new ArrayList<>(); + QueryResult queryResult = influxDB.query(new Query(commond,dbName)); + List results = queryResult.getResults(); + if (results==null||results.isEmpty()){ + return retList; + } + QueryResult.Result result = results.get(0); + List seriess = result.getSeries(); + if (seriess==null||seriess.isEmpty()){ + return retList; + } + QueryResult.Series series = seriess.get(0); + List columns = series.getColumns(); + List> values = series.getValues(); + for (List columnValue :values){ + Map map = new HashMap<>(1); + for (int i=0;i