package com.njcn.influx.deprecated; import lombok.Data; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import org.influxdb.InfluxDB; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.*; import org.influxdb.dto.Point.Builder; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * 类的介绍: * * @author xuyang * @version 1.0.0 * @createTime 2021/11/16 10:20 */ @Slf4j @Data @Deprecated public class InfluxDbUtils { static OkHttpClient.Builder client = new OkHttpClient.Builder() .connectTimeout(1000,TimeUnit.SECONDS) .readTimeout(1000, TimeUnit.SECONDS) .writeTimeout(1000, TimeUnit.SECONDS); private static final int FRACTION_MIN_WIDTH = 0; private static final int FRACTION_MAX_WIDTH = 9; private static final boolean ADD_DECIMAL_POINT = true; private static final DateTimeFormatter RFC3339_FORMATTER = new DateTimeFormatterBuilder() .appendPattern("yyyy-MM-dd'T'HH:mm:ss") .appendFraction(ChronoField.NANO_OF_SECOND, FRACTION_MIN_WIDTH, FRACTION_MAX_WIDTH, ADD_DECIMAL_POINT) .appendZoneOrOffsetId() .toFormatter(); /**用户名*/ private String username; /**密码*/ private String password; /**链接地址*/ private String openurl; /**数据库*/ private String dbName; /**保留策略*/ private String retentionPolicy; private InfluxDB influxDB; public InfluxDbUtils(String username, String password, String url, String dbName, String retentionPolicy) { this.username = username; this.password = password; this.openurl = url; this.dbName = dbName; this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy; influxDbBuild(); } /** * 连接时序数据库 ,若不存在则创建 * * @return */ public InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(openurl, username, password,client); } try { // if (!influxDB.databaseExists(database)) { // influxDB.createDatabase(database); // } } catch (Exception e) { // 该数据库可能设置动态代理,不支持创建数据库 // e.printStackTrace(); } finally { influxDB.setRetentionPolicy(retentionPolicy); } influxDB.setLogLevel(InfluxDB.LogLevel.NONE); return influxDB; } /** * 创建数据库 * * @param dbName */ @SuppressWarnings("deprecation") public void createDB(String dbName) { influxDB.createDatabase(dbName); } /** * 删除数据库 * * @param dbName */ @SuppressWarnings("deprecation") public void deleteDB(String dbName) { influxDB.deleteDatabase(dbName); } /** * 测试连接是否正常 * * @return true 正常 */ public boolean ping() { boolean isConnected = false; Pong pong; try { pong = influxDB.ping(); if (pong != null) { isConnected = true; } } catch (Exception e) { e.printStackTrace(); } return isConnected; } /** * 创建自定义保留策略 * * @param policyName 策略名 * @param days 保存天数 * @param replication 保存副本数量 * @param isDefault 是否设为默认保留策略 */ public void createRetentionPolicy(String dataBaseName, String policyName, int days, int replication, Boolean isDefault) { String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName, dataBaseName, days, replication); if (isDefault) { sql = sql + " DEFAULT"; } query(sql); } /** * 创建默认的保留策略 * * 策略名:hour,保存天数:30天,保存副本数量:1,设为默认保留策略 */ public void createDefaultRetentionPolicy() { String command = String .format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "hour", dbName, "30d", 1); this.query(command); } /** * 查询 * * @param command 查询语句 * @return */ public QueryResult query(String command) { return influxDB.query(new Query(command, dbName)); } /** * 插入 * * @param measurement 表 * @param tags 标签 * @param fields 字段 */ public void insert(String measurement, Map tags, Map fields, long time, TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != time) { builder.time(time, timeUnit); } influxDB.write(dbName, retentionPolicy, builder.build()); } /** * 批量写入测点 * * @param batchPoints */ public void batchInsert(BatchPoints batchPoints, TimeUnit timeUnit) { influxDB.write(batchPoints); // influxDB.enableGzip(); // influxDB.enableBatch(2000,100,timeUnit); // influxDB.disableGzip(); // influxDB.disableBatch(); } /** * 批量写入数据 * * @param database 数据库 * @param retentionPolicy 保存策略 * @param consistency 一致性 * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */ public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,TimeUnit timeUnit, final List records) { influxDB.write(database, retentionPolicy, consistency, records); } /** * 批量写入数据 * * @param database 数据库 * @param retentionPolicy 保存策略 * @param consistency 一致性 * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */ public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List records) { influxDB.write(database, retentionPolicy, consistency, records); } /** * 删除 * * @param command 删除语句 * @return 返回错误信息 */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, dbName)); return result.getError(); } /** * 关闭数据库 */ public void close() { influxDB.close(); } /** * 构建Point * * @param measurement * @param time * @param fields * @return */ public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map tags, Map fields) { Point point = Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build(); return point; } //单条查询语句封装 /** * 查询结果封装到map * @param commond 单条sql语句 * @return 查询结果 */ @Deprecated public List> getResult(String commond){ List> retList = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(commond,dbName)); List results = queryResult.getResults(); if (results==null||results.isEmpty()){ return retList; } QueryResult.Result result = results.get(0); List seriess = result.getSeries(); if (seriess==null||seriess.isEmpty()){ return retList; } QueryResult.Series series = seriess.get(0); List columns = series.getColumns(); List> values = series.getValues(); for (List columnValue :values){ Map map = new HashMap<>(1); for (int i=0;i> getResult(String commond, String type){ List> retList = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(commond,dbName)); List results = queryResult.getResults(); if (results==null||results.isEmpty()){ return retList; } QueryResult.Result result = results.get(0); List seriess = result.getSeries(); if (seriess==null||seriess.isEmpty()){ return retList; } QueryResult.Series series = seriess.get(0); List columns = series.getColumns(); List> values = series.getValues(); for (List columnValue :values){ Map map = new HashMap<>(1); for (int i=0;i> getMapResult(String commond){ List> retList = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(commond,dbName)); List results = queryResult.getResults(); if (results==null||results.isEmpty()){ return retList; } QueryResult.Result result = results.get(0); List seriess = result.getSeries(); if (seriess==null||seriess.isEmpty()){ return retList; } QueryResult.Series series = seriess.get(0); List columns = series.getColumns(); List> values = series.getValues(); for (List columnValue :values){ Map map = new HashMap<>(1); for (int i=0;i