influxORM代码调整

This commit is contained in:
2023-04-26 11:32:11 +08:00
parent 3b6579dda5
commit 29858bbc59
6 changed files with 229 additions and 56 deletions

View File

@@ -1,6 +1,8 @@
package com.njcn.influx.core;
import cn.hutool.core.collection.CollectionUtil;
import com.njcn.influx.constant.InfluxDbSqlConstant;
import com.njcn.influx.utils.InstantUtil;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.influxdb.InfluxDB;
@@ -8,8 +10,13 @@ import org.influxdb.annotation.Measurement;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBMapper;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@@ -31,7 +38,45 @@ public class InfluxExecutor {
* @param domainClass 映射实体
*/
public <E> List<E> select(String sql, Class domainClass) {
List<E> results = influxDbMapper.query(new Query(dealTimeZone(sql), database), domainClass);
System.out.println(sql);
List<E> results;
//如果domainClass没有注解Measurement则使用influxdb去查询
if (!domainClass.isAnnotationPresent(Measurement.class)) {
QueryResult queryResult = influxDb.query(new Query(sql, database));
results = new ArrayList<>();
try {
List<QueryResult.Result> queryResults = queryResult.getResults();
if (CollectionUtil.isNotEmpty(queryResults)) {
for (QueryResult.Result result : queryResults) {
if (CollectionUtil.isNotEmpty(result.getSeries())) {
List<QueryResult.Series> seriesList = result.getSeries();
if (CollectionUtil.isNotEmpty(seriesList)) {
QueryResult.Series series = seriesList.get(0);
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();
for (List<Object> columnValue : values) {
Object obj = domainClass.newInstance();
for (int i = 0; i < columnValue.size(); i++) {
Method value = domainClass.getMethod("setValue", BigDecimal.class);
Method time = domainClass.getMethod("setTime", Instant.class);
if (columns.get(i).equals("value")) {
value.invoke(obj, BigDecimal.valueOf(Double.parseDouble(columnValue.get(i).toString())));
} else if (columns.get(i).equals("time")) {
time.invoke(obj, InstantUtil.stringToInstant(columnValue.get(i).toString().replace("+08:00", "Z")));
}
}
results.add((E) obj);
}
}
}
}
}
} catch (Exception e) {
throw new RuntimeException("influx反射实例化实体异常" + e);
}
} else {
results = influxDbMapper.query(new Query(dealTimeZone(sql), database), domainClass);
}
return results;
}

View File

@@ -0,0 +1,28 @@
package com.njcn.influx.pojo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.njcn.common.utils.serializer.InstantDateSerializer;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
/**
* 用来收集统计结果比如max min mean cp95等函数
* @author hongawen
* @version 1.0.0
*/
@Data
public class StatisticsResult implements Serializable {
/***
* 统计值
*/
private BigDecimal value;
@JsonSerialize(using = InstantDateSerializer.class)
private Instant time;
}

View File

@@ -5,14 +5,13 @@ import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ReflectUtil;
import com.njcn.influx.constant.InfluxDbSqlConstant;
import com.njcn.influx.support.ICFunction;
import com.njcn.influx.utils.LambdaUtil;
import lombok.Data;
import org.influxdb.annotation.Column;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.*;
/**
* @author hongawen
@@ -79,11 +78,13 @@ public class InfluxQueryWrapper {
}
/***
* 初始化查询语句:'select '
* 初始化查询语句
*/
private void initSql() {
this.selectColumns.clear();
this.conditions.clear();
this.groupColumn.clear();
this.orderColumn.clear();
}
@@ -93,14 +94,13 @@ public class InfluxQueryWrapper {
* @param fieldsStr 属性值
* 输出为 select ["influxColumn" as fieldStr]的形式
*/
public InfluxQueryWrapper select(String... fieldsStr) {
public <T, R> InfluxQueryWrapper select(ICFunction<T, R>... fieldsStr) {
if (ArrayUtil.isNotEmpty(fieldsStr)) {
StringBuilder selectFragment = new StringBuilder();
Arrays.stream(fieldsStr).forEach(fieldStr -> {
Field field = this.getTargetClassField(resultEntity, fieldStr);
selectFragment.setLength(0);
selectFragment.append(StrPool.C_SPACE)
.append(this.getColumnName(field, fieldStr))
.append(this.getColumnName(resultEntity, LambdaUtil.columnToString(fieldStr)))
.append(StrPool.C_SPACE)
.append(InfluxDbSqlConstant.AS)
.append(StrPool.C_SPACE)
@@ -135,11 +135,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出COUNT("columnName")
*/
public InfluxQueryWrapper count(String columnName) {
public <T, R> InfluxQueryWrapper count(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.COUNT +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName))+
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -154,11 +154,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出MEAN("columnName")
*/
public InfluxQueryWrapper mean(String columnName) {
public <T, R> InfluxQueryWrapper mean(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.AVG +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName))+
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -174,11 +174,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出MEDIAN("columnName")
*/
public InfluxQueryWrapper median(String columnName) {
public <T, R> InfluxQueryWrapper median(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.MEDIAN +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -193,11 +193,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出MODE("columnName")
*/
public InfluxQueryWrapper mode(String columnName) {
public <T, R> InfluxQueryWrapper mode(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.MODE +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -212,11 +212,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出SPREAD("columnName")
*/
public InfluxQueryWrapper spread(String columnName) {
public <T, R> InfluxQueryWrapper spread(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.SPREAD +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -231,11 +231,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出SUM("columnName")
*/
public InfluxQueryWrapper sum(String columnName) {
public <T, R> InfluxQueryWrapper sum(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.SUM +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -250,11 +250,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出TOP("columnName",number)
*/
public InfluxQueryWrapper top(String columnName, int num) {
public <T, R> InfluxQueryWrapper top(ICFunction<T, R> columnName, int num) {
String selectFragment = InfluxDbSqlConstant.TOP +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
StrPool.COMMA +
num +
@@ -271,11 +271,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出BOTTOM("columnName",number)
*/
public InfluxQueryWrapper bottom(String columnName, int num) {
public <T, R> InfluxQueryWrapper bottom(ICFunction<T, R> columnName, int num) {
String selectFragment = InfluxDbSqlConstant.BOTTOM +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
StrPool.COMMA +
num +
@@ -294,11 +294,11 @@ public class InfluxQueryWrapper {
* 输出DERIVATIVE("columnName")
*/
@Deprecated
public InfluxQueryWrapper derivative(String columnName) {
public <T, R> InfluxQueryWrapper derivative(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.DERIVATIVE +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -313,11 +313,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出LAST("columnName")
*/
public InfluxQueryWrapper last(String columnName) {
public <T, R> InfluxQueryWrapper last(ICFunction<T, R> columnName) {
String selectFragment = InfluxDbSqlConstant.LAST +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS_VALUE;
@@ -333,7 +333,7 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 MAX("columnName")
*/
public InfluxQueryWrapper max(String columnName) {
public <T, R> InfluxQueryWrapper max(ICFunction<T, R> columnName) {
return max(columnName, "value");
}
@@ -345,11 +345,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 MAX("columnName") as resultName
*/
public InfluxQueryWrapper max(String columnName, String resultName) {
public <T, R> InfluxQueryWrapper max(ICFunction<T, R> columnName, String resultName) {
String selectFragment = InfluxDbSqlConstant.MAX +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS +
@@ -366,7 +366,7 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 MIN("columnName")
*/
public InfluxQueryWrapper min(String columnName) {
public <T, R> InfluxQueryWrapper min(ICFunction<T, R> columnName) {
return min(columnName, "value");
}
@@ -378,11 +378,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 MIN("columnName") as resultName
*/
public InfluxQueryWrapper min(String columnName, String resultName) {
public <T, R> InfluxQueryWrapper min(ICFunction<T, R> columnName, String resultName) {
String selectFragment = InfluxDbSqlConstant.MIN +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
InfluxDbSqlConstant.RBK +
InfluxDbSqlConstant.AS +
@@ -400,7 +400,7 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 PERCENTILE("columnName",95)
*/
public InfluxQueryWrapper percentile(String columnName, int percent) {
public <T, R> InfluxQueryWrapper percentile(ICFunction<T, R> columnName, int percent) {
return percentile(columnName, "value", percent);
}
@@ -413,11 +413,11 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 PERCENTILE("columnName",95) as resultName
*/
public InfluxQueryWrapper percentile(String columnName, String resultName, int percent) {
public <T, R> InfluxQueryWrapper percentile(ICFunction<T, R> columnName, String resultName, int percent) {
String selectFragment = InfluxDbSqlConstant.PERCENTILE +
InfluxDbSqlConstant.LBK +
InfluxDbSqlConstant.DQM +
columnName +
this.getColumnName(measurement, LambdaUtil.columnToString(columnName)) +
InfluxDbSqlConstant.DQM +
StrPool.COMMA +
percent +
@@ -442,10 +442,9 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出为time >='2022-04-30 16:00:00' AND time <='2022-05-30 16:00:00'
*/
public InfluxQueryWrapper between(String fieldName, Object val1, Object val2) {
public <T, R> InfluxQueryWrapper between(ICFunction<T, R> fieldName, Object val1, Object val2) {
StringBuilder selectFragment = new StringBuilder();
Field field = this.getTargetClassField(measurement, fieldName);
String columnName = this.getColumnName(field, fieldName);
String columnName = this.getColumnName(measurement, LambdaUtil.columnToString(fieldName));
selectFragment.append(StrPool.C_SPACE)
.append(columnName)
.append(InfluxDbSqlConstant.GE);
@@ -481,9 +480,9 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 columnName = columnValue
*/
public InfluxQueryWrapper eq(String columnName, Object columnValue) {
public <T, R> InfluxQueryWrapper eq(ICFunction<T, R> columnName, Object columnValue) {
StringBuilder selectFragment = new StringBuilder();
selectFragment.append(columnName)
selectFragment.append(this.getColumnName(measurement, LambdaUtil.columnToString(columnName)))
.append(InfluxDbSqlConstant.EQ);
if (columnValue instanceof String) {
//需要用单引号包装下
@@ -497,16 +496,18 @@ public class InfluxQueryWrapper {
return this;
}
/***
* 指定字段大于某个值
* @author hongawen
* @param columnName 表字段名
* @param fieldName 表字段名
* @param columnValue 数值
* @return InfluxQueryWrapper
* 输出 columnName > columnValue
*/
public InfluxQueryWrapper gt(String columnName, Object columnValue) {
public <T, R> InfluxQueryWrapper gt(ICFunction<T, R> fieldName, Object columnValue) {
StringBuilder selectFragment = new StringBuilder();
String columnName = this.getColumnName(measurement, LambdaUtil.columnToString(fieldName));
selectFragment.append(columnName)
.append(InfluxDbSqlConstant.GT);
if (columnValue instanceof String) {
@@ -524,13 +525,14 @@ public class InfluxQueryWrapper {
/***
* 指定字段大于等于某个值
* @author hongawen
* @param columnName 表字段名
* @param fieldName 表字段名
* @param columnValue 数值
* @return InfluxQueryWrapper
* 输出 columnName >= columnValue
*/
public InfluxQueryWrapper ge(String columnName, Object columnValue) {
public <T, R> InfluxQueryWrapper ge(ICFunction<T, R> fieldName, Object columnValue) {
StringBuilder selectFragment = new StringBuilder();
String columnName = this.getColumnName(measurement, LambdaUtil.columnToString(fieldName));
selectFragment.append(columnName)
.append(InfluxDbSqlConstant.GE);
if (columnValue instanceof String) {
@@ -548,13 +550,14 @@ public class InfluxQueryWrapper {
/***
* 指定字段小于某个值
* @author hongawen
* @param columnName 表字段名
* @param fieldName 表字段名
* @param columnValue 数值
* @return InfluxQueryWrapper
* 输出 columnName < columnValue
*/
public InfluxQueryWrapper lt(String columnName, Object columnValue) {
public <T, R> InfluxQueryWrapper lt(ICFunction<T, R> fieldName, Object columnValue) {
StringBuilder selectFragment = new StringBuilder();
String columnName = this.getColumnName(measurement, LambdaUtil.columnToString(fieldName));
selectFragment.append(columnName)
.append(InfluxDbSqlConstant.LT);
if (columnValue instanceof String) {
@@ -572,13 +575,14 @@ public class InfluxQueryWrapper {
/***
* 指定字段小于等于某个值
* @author hongawen
* @param columnName 表字段名
* @param fieldName 表字段名
* @param columnValue 数值
* @return InfluxQueryWrapper
* 输出 columnName <= columnValue
*/
public InfluxQueryWrapper le(String columnName, Object columnValue) {
public <T, R> InfluxQueryWrapper le(ICFunction<T, R> fieldName, Object columnValue) {
StringBuilder selectFragment = new StringBuilder();
String columnName = this.getColumnName(measurement, LambdaUtil.columnToString(fieldName));
selectFragment.append(columnName)
.append(InfluxDbSqlConstant.LE);
if (columnValue instanceof String) {
@@ -603,8 +607,8 @@ public class InfluxQueryWrapper {
* @return InfluxQueryWrapper
* 输出 columnName
*/
public InfluxQueryWrapper groupBy(String columnName) {
groupColumn.add(columnName);
public <T, R> InfluxQueryWrapper groupBy(ICFunction<T, R> columnName) {
groupColumn.add(this.getColumnName(measurement, LambdaUtil.columnToString(columnName)));
return this;
}
@@ -703,11 +707,12 @@ public class InfluxQueryWrapper {
/***
* 获取sql拼接的名称存在注解名就用注解名否则就用属性名
* @author hongawen
* @param field 属性
* @param clazz 类型
* @param fieldName 属性名
* @return String
*/
private String getColumnName(Field field, String fieldName) {
private String getColumnName(Class<?> clazz, String fieldName) {
Field field = this.getTargetClassField(clazz, fieldName);
Column column = field.getAnnotation(Column.class);
//数据库字段
String influxColumn;

View File

@@ -0,0 +1,13 @@
package com.njcn.influx.support;
import java.io.Serializable;
import java.util.function.Function;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年04月25日 10:36
*/
@FunctionalInterface
public interface ICFunction<T, R> extends Function<T, R>, Serializable {
}

View File

@@ -0,0 +1,35 @@
package com.njcn.influx.utils;
import cn.hutool.core.date.DatePattern;
import com.njcn.common.pojo.exception.BusinessException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年04月26日 10:54
*/
public class InstantUtil {
public static Instant stringToInstant(String time) {
if (time.length() > 22) {
return LocalDateTime.parse(time, DateTimeFormatter.ofPattern(DatePattern.UTC_MS_PATTERN)
.withZone(ZoneId.systemDefault())
.withLocale(Locale.CHINA))
.toInstant(ZoneOffset.UTC);
} else {
return LocalDateTime.parse(time, DateTimeFormatter.ofPattern(DatePattern.UTC_PATTERN)
.withZone(ZoneId.systemDefault())
.withLocale(Locale.CHINA))
.toInstant(ZoneOffset.UTC);
}
}
}

View File

@@ -0,0 +1,47 @@
package com.njcn.influx.utils;
import com.njcn.influx.support.ICFunction;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年04月25日 14:14
*/
public class LambdaUtil {
/***
* 将函数接口信息转为属性名
* @author hongawen
* @return String 属性名
* 将getXxxx 转义为 xxxx
*/
public static <T, R> String columnToString(ICFunction<T, R> columnName) {
SerializedLambda serializedLambda = null;
try {
serializedLambda = doSFunction(columnName);
} catch (Exception e) {
throw new RuntimeException("函数式接口获取类信息异常");
}
String methodName = serializedLambda.getImplMethodName();
String properties = methodName.substring(3);
String firstLetter = String.valueOf(properties.charAt(0)).toLowerCase();
properties=properties.substring(1);
return firstLetter.concat(properties);
}
/***
* 反射获取接口信息
*/
public static <T, R> SerializedLambda doSFunction(ICFunction<T, R> func) throws Exception {
// 直接调用writeReplace
Method writeReplace = func.getClass().getDeclaredMethod("writeReplace");
writeReplace.setAccessible(true);
//反射调用
Object sl = writeReplace.invoke(func);
SerializedLambda serializedLambda = (SerializedLambda) sl;
return serializedLambda;
}
}