初始化

This commit is contained in:
2022-06-21 20:47:46 +08:00
parent b666a24a98
commit 59da3376c1
1246 changed files with 129600 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
package com.njcn.influxdb.config;
import com.njcn.influxdb.utils.InfluxDbUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2021/12/10 10:48
*/
@Configuration
public class InfluxDbConfig {
@Value("${spring.influx.url:''}")
private String influxDBUrl;
@Value("${spring.influx.user:''}")
private String userName;
@Value("${spring.influx.password:''}")
private String password;
@Value("${spring.influx.database:''}")
private String database;
@Bean
public InfluxDbUtils influxDbUtils() {
return new InfluxDbUtils(userName, password, influxDBUrl, database, "autogen");
}
}

View File

@@ -0,0 +1,324 @@
package com.njcn.influxdb.utils;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;
import org.influxdb.dto.Point.Builder;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2021/11/16 10:20
*/
@Slf4j
@Data
public class InfluxDbUtils {
/**用户名*/
private String username;
/**密码*/
private String password;
/**链接地址*/
private String openurl;
/**数据库*/
private String dbName;
/**保留策略*/
private String retentionPolicy;
private InfluxDB influxDB;
public InfluxDbUtils(String username, String password, String url, String dbName, String retentionPolicy) {
this.username = username;
this.password = password;
this.openurl = url;
this.dbName = dbName;
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
influxDbBuild();
}
/**
* 连接时序数据库 ,若不存在则创建
*
* @return
*/
public InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(openurl, username, password);
}
try {
// if (!influxDB.databaseExists(database)) {
// influxDB.createDatabase(database);
// }
} catch (Exception e) {
// 该数据库可能设置动态代理,不支持创建数据库
// e.printStackTrace();
} finally {
influxDB.setRetentionPolicy(retentionPolicy);
}
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
return influxDB;
}
/**
* 创建数据库
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void createDB(String dbName) {
influxDB.createDatabase(dbName);
}
/**
* 删除数据库
*
* @param dbName
*/
@SuppressWarnings("deprecation")
public void deleteDB(String dbName) {
influxDB.deleteDatabase(dbName);
}
/**
* 测试连接是否正常
*
* @return true 正常
*/
public boolean ping() {
boolean isConnected = false;
Pong pong;
try {
pong = influxDB.ping();
if (pong != null) {
isConnected = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return isConnected;
}
/**
* 创建自定义保留策略
*
* @param policyName 策略名
* @param days 保存天数
* @param replication 保存副本数量
* @param isDefault 是否设为默认保留策略
*/
public void createRetentionPolicy(String dataBaseName, String policyName, int days, int replication,
Boolean isDefault) {
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName,
dataBaseName, days, replication);
if (isDefault) {
sql = sql + " DEFAULT";
}
query(sql);
}
/**
* 创建默认的保留策略
*
* 策略名hour保存天数30天保存副本数量1,设为默认保留策略
*/
public void createDefaultRetentionPolicy() {
String command = String
.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "hour", dbName,
"30d", 1);
this.query(command);
}
/**
* 查询
*
* @param command 查询语句
* @return
*/
public QueryResult query(String command) {
return influxDB.query(new Query(command, dbName));
}
/**
* 插入
*
* @param measurement 表
* @param tags 标签
* @param fields 字段
*/
public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,
TimeUnit timeUnit) {
Builder builder = Point.measurement(measurement);
builder.tag(tags);
builder.fields(fields);
if (0 != time) {
time = time + 28800000;
builder.time(time, timeUnit);
}
influxDB.write(dbName, retentionPolicy, builder.build());
}
/**
* 批量写入测点
*
* @param batchPoints
*/
public void batchInsert(BatchPoints batchPoints, TimeUnit timeUnit) {
influxDB.write(batchPoints);
// influxDB.enableGzip();
// influxDB.enableBatch(2000,100,timeUnit);
// influxDB.disableGzip();
// influxDB.disableBatch();
}
/**
* 批量写入数据
*
* @param database 数据库
* @param retentionPolicy 保存策略
* @param consistency 一致性
* @param records 要保存的数据调用BatchPoints.lineProtocol()可得到一条record
*/
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,TimeUnit timeUnit, final List<String> records) {
influxDB.write(database, retentionPolicy, consistency, records);
}
/**
* 批量写入数据
*
* @param database 数据库
* @param retentionPolicy 保存策略
* @param consistency 一致性
* @param records 要保存的数据调用BatchPoints.lineProtocol()可得到一条record
*/
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List<String> records) {
influxDB.write(database, retentionPolicy, consistency, records);
}
/**
* 删除
*
* @param command 删除语句
* @return 返回错误信息
*/
public String deleteMeasurementData(String command) {
QueryResult result = influxDB.query(new Query(command, dbName));
return result.getError();
}
/**
* 关闭数据库
*/
public void close() {
influxDB.close();
}
/**
* 构建Point
*
* @param measurement
* @param time
* @param fields
* @return
*/
public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags,
Map<String, Object> fields) {
Point point = Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build();
return point;
}
//单条查询语句封装
/**
* 查询结果封装到map
* @param commond 单条sql语句
* @return 查询结果
*/
public List<Map<String, Object>> getResult(String commond){
List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){
return retList;
}
QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){
return retList;
}
QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){
Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){
if(columns.get(i).equals("time")){
long aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8)).getEpochSecond();
map.put(columns.get(i), aa);
}else {
map.put(columns.get(i),columnValue.get(i));
}
}
retList.add(map);
}
return retList;
}
public List<Map<String, Object>> getResult(String commond, String type){
List<Map<String, Object>> retList = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(commond,dbName));
List<QueryResult.Result> results = queryResult.getResults();
if (results==null||results.isEmpty()){
return retList;
}
QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriess = result.getSeries();
if (seriess==null||seriess.isEmpty()){
return retList;
}
QueryResult.Series series = seriess.get(0);
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();
for (List<Object> columnValue :values){
Map<String, Object> map = new HashMap<>(1);
for (int i=0;i<columnValue.size();i++){
if(columns.get(i).equals("time")){
Instant aa = Instant.parse(columnValue.get(i).toString()).minusMillis(TimeUnit.HOURS.toMillis(8));
LocalDateTime localDateTime =LocalDateTime.ofInstant(aa,ZoneId.systemDefault());
String time = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
map.put(columns.get(i), time);
}else {
map.put(columns.get(i),columnValue.get(i));
}
}
retList.add(map);
}
return retList;
}
}

View File

@@ -0,0 +1,204 @@
import com.njcn.influxdb.utils.InfluxDbUtils;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2021/11/16 11:07
*/
public class test {
//查询
public static QueryResult select(InfluxDbUtils influxDBUtil) {
long startTime = System.currentTimeMillis();
QueryResult result = influxDBUtil.query("select * from data_V where phasic_type='A'");
long endTime = System.currentTimeMillis();
System.out.println(endTime - startTime);
return result;
}
//处理结果集
public static void chanelResult(QueryResult result) {
QueryResult.Result result1 = result.getResults().get(0);
if (result1.getSeries() != null) {
List<List<Object>> valueList = result1.getSeries().stream().map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
if (valueList != null && valueList.size() > 0) {
for (List<Object> value : valueList) {
Map<String, String> map = new HashMap<String, String>();
// 数据库中字段1取值
String field1 = value.get(0) == null ? null : value.get(0).toString();
System.out.println(field1);
// 数据库中字段2取值
String field2 = value.get(1) == null ? null : value.get(1).toString();
System.out.println(field2);
// TODO 用取出的字段做你自己的业务逻辑……
}
}
}
}
public static void main(String[] args) {
InfluxDbUtils influxDBUtil = new InfluxDbUtils("root", "123456789", "http://192.168.1.16:8086", "pqsbase", "");
insert(influxDBUtil);
}
//单条数据插入
public static void insert(InfluxDbUtils influxDBUtil) {
Map<String, String> tags = new HashMap<>();
tags.put("lineid", "1e3b8531483b2a8cbee6747f1f641cf9");
Map<String, Object> fields = new HashMap<>();
fields.put("phasic_type","T");
fields.put("value_type","MAX");
fields.put("Freq_Dev", 48.6 );
fields.put("Voltage_Dev", 8.3 );
fields.put("UBalance", 7.6 );
fields.put("Flicker", 4.6 );
fields.put("UAberrance", 6.5 );
fields.put("I_Neg", 5.6 );
fields.put("UHarm_2", 6.8 );
fields.put("UHarm_3", 5.5 );
fields.put("UHarm_4", 5.4 );
fields.put("UHarm_5", 7.8 );
fields.put("UHarm_6", 6.2 );
fields.put("UHarm_7", 2.5 );
fields.put("UHarm_8", 8.3 );
fields.put("UHarm_9", 6.2 );
fields.put("UHarm_10", 12.8 );
fields.put("UHarm_11", 2.8 );
fields.put("UHarm_12", 8.4 );
fields.put("UHarm_13", 5.6 );
fields.put("UHarm_14", 5.2 );
fields.put("UHarm_15", 9.5 );
fields.put("UHarm_16", 8.3 );
fields.put("UHarm_17", 7.8 );
fields.put("UHarm_18", 6.2 );
fields.put("UHarm_19", 2.5);
fields.put("UHarm_20", 4.5 );
fields.put("UHarm_21", 4.5 );
fields.put("UHarm_22", 6.5 );
fields.put("UHarm_23", 5.9 );
fields.put("UHarm_24", 9.2 );
fields.put("UHarm_25", 5.8 );
fields.put("IHarm_2", 12.8 );
fields.put("IHarm_3", 5.4 );
fields.put("IHarm_4", 6.2 );
fields.put("IHarm_5", 3.2 );
fields.put("IHarm_6", 5.2 );
fields.put("IHarm_7", 5.2 );
fields.put("IHarm_8", 5.5 );
fields.put("IHarm_9", 4.8 );
fields.put("IHarm_10", 8.2 );
fields.put("IHarm_11", 2.5 );
fields.put("IHarm_12", 8.6 );
fields.put("IHarm_13", 5.8 );
fields.put("IHarm_14", 3.5 );
fields.put("IHarm_15", 2.4 );
fields.put("IHarm_16", 5.2 );
fields.put("IHarm_17", 2.5 );
fields.put("IHarm_18", 9.2 );
fields.put("IHarm_19", 8.5);
fields.put("IHarm_20", 8.5 );
fields.put("IHarm_21", 6.2 );
fields.put("IHarm_22", 5.2 );
fields.put("IHarm_23", 8.5 );
fields.put("IHarm_24", 5.2 );
fields.put("IHarm_25", 8.4 );
fields.put("InUHARM_1", 8.2 );
fields.put("InUHARM_2", 5.2 );
fields.put("InUHARM_3", 6.2 );
fields.put("InUHARM_4", 4.2 );
fields.put("InUHARM_5", 2.3 );
fields.put("InUHARM_6", 6.2 );
fields.put("InUHARM_7", 5.2 );
fields.put("InUHARM_8", 10.2 );
fields.put("InUHARM_9", 2.3 );
fields.put("InUHARM_10", 4.2 );
fields.put("InUHARM_11", 3.5 );
fields.put("InUHARM_12", 3.6 );
fields.put("InUHARM_13", 2.3 );
fields.put("InUHARM_14", 7.2 );
fields.put("InUHARM_15", 5.6 );
fields.put("InUHARM_16", 5.6 );
influxDBUtil.insert("PQS_AbnormalData", tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
//循环写入数据库
public static void batchInsertOne(InfluxDbUtils influxDBUtil) {
Map<String, String> tags1 = new HashMap<>();
tags1.put("LineID", "8");
tags1.put("Phasic_Type", "A");
Map<String, Object> fields1 = new HashMap<>();
fields1.put("RMS", 2);
fields1.put("RMS_AB", 2);
fields1.put("RMS_BC", 2);
fields1.put("RMS_CA", 2);
Map<String, String> tags2 = new HashMap<>();
tags2.put("LineID", "9");
tags2.put("Phasic_Type", "A");
Map<String, Object> fields2 = new HashMap<>();
fields2.put("RMS", 4);
fields2.put("RMS_AB", 4);
fields2.put("RMS_BC", 4);
fields2.put("RMS_CA", 4);
// 一条记录值
Point point1 = influxDBUtil.pointBuilder("test", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
Point point2 = influxDBUtil.pointBuilder("test", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
// 将两条记录添加到batchPoints中
BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "8").tag("Phasic_Type", "A").retentionPolicy("")
.consistency(ConsistencyLevel.ALL).build();
BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "9").tag("Phasic_Type", "A").retentionPolicy("")
.consistency(ConsistencyLevel.ALL).build();
batchPoints1.point(point1);
batchPoints2.point(point2);
// 将两条数据批量插入到数据库中
influxDBUtil.batchInsert(batchPoints1, TimeUnit.MILLISECONDS);
influxDBUtil.batchInsert(batchPoints2, TimeUnit.MILLISECONDS);
}
//批量插入数据
public static void batchInsert(InfluxDbUtils influxDBUtil) {
Map<String, String> tags1 = new HashMap<>();
tags1.put("LineID", "4");
tags1.put("Phasic_Type", "A");
Map<String, Object> fields1 = new HashMap<>();
fields1.put("RMS", 4.1111);
fields1.put("RMS_AB", 4.1111);
fields1.put("RMS_BC", 4.1111);
fields1.put("RMS_CA", 4.1111);
Map<String, String> tags2 = new HashMap<>();
tags2.put("LineID", "5");
tags2.put("Phasic_Type", "A");
Map<String, Object> fields2 = new HashMap<>();
fields2.put("RMS", 5.1111);
fields2.put("RMS_AB", 5.1111);
fields2.put("RMS_BC", 5.1111);
fields2.put("RMS_CA", 5.1111);
// 一条记录值。注意生产环境不要用System.currentTimeMillis(),因为数据量大会产生重复时间戳,导致数据丢失,要用数据自己的时间戳,这里只做演示)
Point point1 = influxDBUtil.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
Point point2 = influxDBUtil.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
// BatchPoints batchPoints1 = BatchPoints.database("Data_v").tag("LineID", "4").tag("Phasic_Type","A").retentionPolicy("").consistency(ConsistencyLevel.ALL).precision(TimeUnit.MILLISECONDS).build();
BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "4").tag("Phasic_Type", "A").retentionPolicy("").consistency(ConsistencyLevel.ALL).build();
batchPoints1.point(point1);
BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "5").tag("Phasic_Type", "A").retentionPolicy("").consistency(ConsistencyLevel.ALL).build();
// 将两条记录添加到batchPoints中
batchPoints2.point(point2);
// 将不同的batchPoints序列化后一次性写入数据库提高写入速度
List<String> records = new ArrayList<String>();
records.add(batchPoints1.lineProtocol());
records.add(batchPoints2.lineProtocol());
// 将两条数据批量插入到数据库中
influxDBUtil.batchInsert("test", "", ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
}