package com.njcn.influx.utils; import lombok.Data; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import org.influxdb.InfluxDB; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.*; import org.influxdb.dto.Point.Builder; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * 类的介绍: * * @author xuyang * @version 1.0.0 * @createTime 2021/11/16 10:20 */ @Slf4j @Data public class InfluxDbUtils { public static OkHttpClient.Builder client = new OkHttpClient.Builder() .connectTimeout(1000, TimeUnit.SECONDS) .readTimeout(1000, TimeUnit.SECONDS) .writeTimeout(1000, TimeUnit.SECONDS); private static final int FRACTION_MIN_WIDTH = 0; private static final int FRACTION_MAX_WIDTH = 9; private static final boolean ADD_DECIMAL_POINT = true; private static final DateTimeFormatter RFC3339_FORMATTER = new DateTimeFormatterBuilder() .appendPattern("yyyy-MM-dd'T'HH:mm:ss") .appendFraction(ChronoField.NANO_OF_SECOND, FRACTION_MIN_WIDTH, FRACTION_MAX_WIDTH, ADD_DECIMAL_POINT) .appendZoneOrOffsetId() .toFormatter(); /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * 链接地址 */ private String openurl; /** * 数据库 */ private String dbName; /** * 保留策略 */ private String retentionPolicy; private InfluxDB influxDB; public InfluxDbUtils(String username, String password, String url, String dbName, String retentionPolicy) { this.username = username; this.password = password; this.openurl = url; this.dbName = dbName; this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy; influxDbBuild(); } /** * 连接时序数据库 ,若不存在则创建 * * @return */ public InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(openurl, username, password, client); } try { // if (!influxDB.databaseExists(database)) { // influxDB.createDatabase(database); // } } catch (Exception e) { // 该数据库可能设置动态代理,不支持创建数据库 // e.printStackTrace(); } finally { influxDB.setRetentionPolicy(retentionPolicy); } influxDB.setLogLevel(InfluxDB.LogLevel.NONE); return influxDB; } /** * 创建数据库 * * @param dbName */ @SuppressWarnings("deprecation") public void createDB(String dbName) { influxDB.createDatabase(dbName); } /** * 删除数据库 * * @param dbName */ @SuppressWarnings("deprecation") public void deleteDB(String dbName) { influxDB.deleteDatabase(dbName); } /** * 测试连接是否正常 * * @return true 正常 */ public boolean ping() { boolean isConnected = false; Pong pong; try { pong = influxDB.ping(); if (pong != null) { isConnected = true; } } catch (Exception e) { e.printStackTrace(); } return isConnected; } /** * 创建自定义保留策略 * * @param policyName 策略名 * @param days 保存天数 * @param replication 保存副本数量 * @param isDefault 是否设为默认保留策略 */ public void createRetentionPolicy(String dataBaseName, String policyName, int days, int replication, Boolean isDefault) { String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName, dataBaseName, days, replication); if (isDefault) { sql = sql + " DEFAULT"; } query(sql); } /** * 创建默认的保留策略 *

* 策略名:hour,保存天数:30天,保存副本数量:1,设为默认保留策略 */ public void createDefaultRetentionPolicy() { String command = String .format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "hour", dbName, "30d", 1); this.query(command); } /** * 查询 * * @param command 查询语句 * @return */ public QueryResult query(String command) { return influxDB.query(new Query(command, dbName)); } /** * 插入 * * @param measurement 表 * @param tags 标签 * @param fields 字段 */ public void insert(String measurement, Map tags, Map fields, long time, TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != time) { builder.time(time, timeUnit); } influxDB.write(dbName, retentionPolicy, builder.build()); } /** * 批量写入测点 * * @param batchPoints */ public void batchInsert(BatchPoints batchPoints, TimeUnit timeUnit) { influxDB.write(batchPoints); // influxDB.enableGzip(); // influxDB.enableBatch(2000,100,timeUnit); // influxDB.disableGzip(); // influxDB.disableBatch(); } /** * 批量写入数据 * * @param database 数据库 * @param retentionPolicy 保存策略 * @param consistency 一致性 * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */ public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, TimeUnit timeUnit, final List records) { influxDB.write(database, retentionPolicy, consistency, records); } /** * 批量写入数据 * * @param database 数据库 * @param retentionPolicy 保存策略 * @param consistency 一致性 * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */ public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List records) { influxDB.write(database, retentionPolicy, consistency, records); } /** * 删除 * * @param command 删除语句 * @return 返回错误信息 */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, dbName)); return result.getError(); } /** * 关闭数据库 */ public void close() { influxDB.close(); } /** * 构建Point * * @param measurement * @param time * @param fields * @return */ public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map tags, Map fields) { Point point = Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build(); return point; } //单条查询语句封装 /** * 查询结果封装到map * * @param commond 单条sql语句 * @return 查询结果 */ @Deprecated public List> getResult(String commond) { List> retList = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(commond, dbName)); List results = queryResult.getResults(); if (results == null || results.isEmpty()) { return retList; } QueryResult.Result result = results.get(0); List seriess = result.getSeries(); if (seriess == null || seriess.isEmpty()) { return retList; } QueryResult.Series series = seriess.get(0); List columns = series.getColumns(); List> values = series.getValues(); for (List columnValue : values) { Map map = new HashMap<>(1); for (int i = 0; i < columnValue.size(); i++) { if (columns.get(i).equals("time")) { long aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)).getEpochSecond(); map.put(columns.get(i), aa); } else { map.put(columns.get(i), columnValue.get(i)); } } retList.add(map); } return retList; } @Deprecated public List> getResult(String commond, String type) { List> retList = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(commond, dbName)); List results = queryResult.getResults(); if (results == null || results.isEmpty()) { return retList; } QueryResult.Result result = results.get(0); List seriess = result.getSeries(); if (seriess == null || seriess.isEmpty()) { return retList; } QueryResult.Series series = seriess.get(0); List columns = series.getColumns(); List> values = series.getValues(); for (List columnValue : values) { Map map = new HashMap<>(1); for (int i = 0; i < columnValue.size(); i++) { if (columns.get(i).equals("time")) { Instant aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)); LocalDateTime localDateTime = LocalDateTime.ofInstant(aa, ZoneId.systemDefault()); String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); map.put(columns.get(i), time); } else { map.put(columns.get(i), columnValue.get(i)); } } retList.add(map); } return retList; } public List> getMapResult(String commond) { List> retList = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(commond, dbName)); List results = queryResult.getResults(); if (results == null || results.isEmpty()) { return retList; } QueryResult.Result result = results.get(0); List seriess = result.getSeries(); if (seriess == null || seriess.isEmpty()) { return retList; } QueryResult.Series series = seriess.get(0); List columns = series.getColumns(); List> values = series.getValues(); for (List columnValue : values) { Map map = new HashMap<>(1); for (int i = 0; i < columnValue.size(); i++) { if (columns.get(i).equals("time")) { Instant instant = Instant.from(RFC3339_FORMATTER.parse(String.valueOf(columnValue.get(i)))); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); map.put(columns.get(i), time); } else { map.put(columns.get(i), columnValue.get(i)); } } retList.add(map); } return retList; } }