This commit is contained in:
2024-01-23 13:59:54 +08:00
parent 3bb091fd9c
commit 34c97425da
4 changed files with 111 additions and 52 deletions

View File

@@ -5,6 +5,7 @@ import com.njcn.influx.core.InfluxExecutor;
import com.njcn.influx.utils.InfluxDbUtils; import com.njcn.influx.utils.InfluxDbUtils;
import lombok.Data; import lombok.Data;
import org.influxdb.InfluxDB; import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.impl.InfluxDBMapper; import org.influxdb.impl.InfluxDBMapper;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
@@ -42,12 +43,14 @@ public class InfluxDbConfig {
@Bean(name = "influxDbMapper") @Bean(name = "influxDbMapper")
public InfluxDBMapper influxDbMapper(InfluxDB influxDb) { public InfluxDBMapper influxDbMapper(InfluxDB influxDb) {
influxDb = InfluxDBFactory.connect(url, user, password,InfluxDbUtils.client);
influxDb.setLogLevel(InfluxDB.LogLevel.BASIC); influxDb.setLogLevel(InfluxDB.LogLevel.BASIC);
return new InfluxDBMapper(influxDb); return new InfluxDBMapper(influxDb);
} }
@Bean(name = "influxDbExecutor") @Bean(name = "influxDbExecutor")
public InfluxExecutor executor(InfluxDB influxDb, InfluxDBMapper influxDbMapper) { public InfluxExecutor executor(InfluxDB influxDb, InfluxDBMapper influxDbMapper) {
influxDb = InfluxDBFactory.connect(url, user, password,InfluxDbUtils.client);
return new InfluxExecutor(influxDb, influxDbMapper,database); return new InfluxExecutor(influxDb, influxDbMapper,database);
} }

View File

@@ -32,8 +32,8 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Data @Data
public class InfluxDbUtils { public class InfluxDbUtils {
static OkHttpClient.Builder client = new OkHttpClient.Builder() public static OkHttpClient.Builder client = new OkHttpClient.Builder()
.connectTimeout(1000,TimeUnit.SECONDS) .connectTimeout(1000, TimeUnit.SECONDS)
.readTimeout(1000, TimeUnit.SECONDS) .readTimeout(1000, TimeUnit.SECONDS)
.writeTimeout(1000, TimeUnit.SECONDS); .writeTimeout(1000, TimeUnit.SECONDS);
private static final int FRACTION_MIN_WIDTH = 0; private static final int FRACTION_MIN_WIDTH = 0;
@@ -45,15 +45,25 @@ public class InfluxDbUtils {
.appendZoneOrOffsetId() .appendZoneOrOffsetId()
.toFormatter(); .toFormatter();
/**用户名*/ /**
* 用户名
*/
private String username; private String username;
/**密码*/ /**
* 密码
*/
private String password; private String password;
/**链接地址*/ /**
* 链接地址
*/
private String openurl; private String openurl;
/**数据库*/ /**
* 数据库
*/
private String dbName; private String dbName;
/**保留策略*/ /**
* 保留策略
*/
private String retentionPolicy; private String retentionPolicy;
private InfluxDB influxDB; private InfluxDB influxDB;
@@ -75,7 +85,7 @@ public class InfluxDbUtils {
*/ */
public InfluxDB influxDbBuild() { public InfluxDB influxDbBuild() {
if (influxDB == null) { if (influxDB == null) {
influxDB = InfluxDBFactory.connect(openurl, username, password,client); influxDB = InfluxDBFactory.connect(openurl, username, password, client);
} }
try { try {
// if (!influxDB.databaseExists(database)) { // if (!influxDB.databaseExists(database)) {
@@ -92,8 +102,6 @@ public class InfluxDbUtils {
} }
/** /**
* 创建数据库 * 创建数据库
* *
@@ -153,7 +161,7 @@ public class InfluxDbUtils {
/** /**
* 创建默认的保留策略 * 创建默认的保留策略
* * <p>
* 策略名hour保存天数30天保存副本数量1,设为默认保留策略 * 策略名hour保存天数30天保存副本数量1,设为默认保留策略
*/ */
public void createDefaultRetentionPolicy() { public void createDefaultRetentionPolicy() {
@@ -192,7 +200,6 @@ public class InfluxDbUtils {
} }
/** /**
* 批量写入测点 * 批量写入测点
* *
@@ -214,7 +221,7 @@ public class InfluxDbUtils {
* @param consistency 一致性 * @param consistency 一致性
* @param records 要保存的数据调用BatchPoints.lineProtocol()可得到一条record * @param records 要保存的数据调用BatchPoints.lineProtocol()可得到一条record
*/ */
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,TimeUnit timeUnit, final List<String> records) { public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, TimeUnit timeUnit, final List<String> records) {
influxDB.write(database, retentionPolicy, consistency, records); influxDB.write(database, retentionPolicy, consistency, records);
} }
@@ -264,39 +271,38 @@ public class InfluxDbUtils {
} }
//单条查询语句封装 //单条查询语句封装
/** /**
* 查询结果封装到map * 查询结果封装到map
* @param commond 单条sql语句 *
* @param commond 单条sql语句
* @return 查询结果 * @return 查询结果
*/ */
@Deprecated @Deprecated
public List<Map<String, Object>> getResult(String commond){ public List<Map<String, Object>> getResult(String commond) {
List<Map<String, Object>> retList = new ArrayList<>(); List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName)); QueryResult queryResult = influxDB.query(new Query(commond, dbName));
List<QueryResult.Result> results = queryResult.getResults(); List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){ if (results == null || results.isEmpty()) {
return retList; return retList;
} }
QueryResult.Result result = results.get(0); QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries(); List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){ if (seriess == null || seriess.isEmpty()) {
return retList; return retList;
} }
QueryResult.Series series = seriess.get(0); QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns(); List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues(); List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){ for (List<Object> columnValue : values) {
Map<String, Object> map = new HashMap<>(1); Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){ for (int i = 0; i < columnValue.size(); i++) {
if(columns.get(i).equals("time")){ if (columns.get(i).equals("time")) {
long aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)).getEpochSecond(); long aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)).getEpochSecond();
map.put(columns.get(i), aa); map.put(columns.get(i), aa);
}else { } else {
map.put(columns.get(i),columnValue.get(i)); map.put(columns.get(i), columnValue.get(i));
} }
} }
retList.add(map); retList.add(map);
@@ -305,32 +311,32 @@ public class InfluxDbUtils {
} }
@Deprecated @Deprecated
public List<Map<String, Object>> getResult(String commond, String type){ public List<Map<String, Object>> getResult(String commond, String type) {
List<Map<String, Object>> retList = new ArrayList<>(); List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName)); QueryResult queryResult = influxDB.query(new Query(commond, dbName));
List<QueryResult.Result> results = queryResult.getResults(); List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){ if (results == null || results.isEmpty()) {
return retList; return retList;
} }
QueryResult.Result result = results.get(0); QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries(); List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){ if (seriess == null || seriess.isEmpty()) {
return retList; return retList;
} }
QueryResult.Series series = seriess.get(0); QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns(); List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues(); List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){ for (List<Object> columnValue : values) {
Map<String, Object> map = new HashMap<>(1); Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){ for (int i = 0; i < columnValue.size(); i++) {
if(columns.get(i).equals("time")){ if (columns.get(i).equals("time")) {
Instant aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)); Instant aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8));
LocalDateTime localDateTime =LocalDateTime.ofInstant(aa,ZoneId.systemDefault()); LocalDateTime localDateTime = LocalDateTime.ofInstant(aa, ZoneId.systemDefault());
String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
map.put(columns.get(i), time); map.put(columns.get(i), time);
}else { } else {
map.put(columns.get(i),columnValue.get(i)); map.put(columns.get(i), columnValue.get(i));
} }
} }
retList.add(map); retList.add(map);
@@ -339,32 +345,32 @@ public class InfluxDbUtils {
} }
public List<Map<String, Object>> getMapResult(String commond){ public List<Map<String, Object>> getMapResult(String commond) {
List<Map<String, Object>> retList = new ArrayList<>(); List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName)); QueryResult queryResult = influxDB.query(new Query(commond, dbName));
List<QueryResult.Result> results = queryResult.getResults(); List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){ if (results == null || results.isEmpty()) {
return retList; return retList;
} }
QueryResult.Result result = results.get(0); QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries(); List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){ if (seriess == null || seriess.isEmpty()) {
return retList; return retList;
} }
QueryResult.Series series = seriess.get(0); QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns(); List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues(); List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){ for (List<Object> columnValue : values) {
Map<String, Object> map = new HashMap<>(1); Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){ for (int i = 0; i < columnValue.size(); i++) {
if(columns.get(i).equals("time")){ if (columns.get(i).equals("time")) {
Instant instant = Instant.from(RFC3339_FORMATTER.parse(String.valueOf(columnValue.get(i)))); Instant instant = Instant.from(RFC3339_FORMATTER.parse(String.valueOf(columnValue.get(i))));
LocalDateTime localDateTime =LocalDateTime.ofInstant(instant,ZoneId.systemDefault()); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
map.put(columns.get(i), time); map.put(columns.get(i), time);
}else { } else {
map.put(columns.get(i),columnValue.get(i)); map.put(columns.get(i), columnValue.get(i));
} }
} }
retList.add(map); retList.add(map);
@@ -373,5 +379,4 @@ public class InfluxDbUtils {
} }
} }

View File

@@ -0,0 +1,38 @@
package com.njcn.influx.utils;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Instant;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年07月24日 13:33
*/
@Component
public class InstantDateDeserializer extends StdDeserializer<Instant> {
public InstantDateDeserializer() {
this(null);
}
protected InstantDateDeserializer(Class<?> vc) {
super(vc);
}
@Override
public Instant deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException {
String text = p.getValueAsString();
return InstantUtil.dateToInstant(DateUtil.parse(text,DatePattern.NORM_DATETIME_PATTERN));
}
}

View File

@@ -7,7 +7,9 @@ import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit;
/** /**
* @author hongawen * @author hongawen
@@ -31,4 +33,15 @@ public class InstantUtil {
} }
/***
* 将date转为instant 处理8小时误差
* @author hongawen
* @date 2023/7/20 15:58
* @param date 日期
* @return Instant
*/
public static Instant dateToInstant(Date date){
return date.toInstant().plusMillis(TimeUnit.HOURS.toMillis(8));
}
} }