冲突
This commit is contained in:
265
pqs-common/common-influxdb/src/test/java/DataTest.java
Normal file
265
pqs-common/common-influxdb/src/test/java/DataTest.java
Normal file
@@ -0,0 +1,265 @@
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import com.njcn.event.pojo.po.EventDetail;
|
||||
import com.njcn.influxdb.utils.InfluxDbUtils;
|
||||
import org.influxdb.InfluxDB.ConsistencyLevel;
|
||||
import org.influxdb.dto.BatchPoints;
|
||||
import org.influxdb.dto.Point;
|
||||
import org.influxdb.dto.QueryResult;
|
||||
import org.influxdb.impl.InfluxDBResultMapper;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2021/11/16 11:07
|
||||
*/
|
||||
|
||||
public class DataTest {
|
||||
|
||||
//查询
|
||||
public static QueryResult select(InfluxDbUtils influxDBUtil) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
//组装sql语句
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append("time >= '").append(DateUtil.beginOfDay(DateUtil.parse("2022-05-06"))).append("' and ").append("time <= '").append(DateUtil.endOfDay(DateUtil.parse("2022-05-06"))).append("' and (");
|
||||
//sql语句
|
||||
stringBuilder.append("line_id ='").append("1e3b8531483b2a8cbee6747f1f641cf9").append("')");
|
||||
//获取暂降事件
|
||||
QueryResult result = influxDBUtil.query("select * from pqs_eventdetail where " + stringBuilder.toString());
|
||||
InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper();
|
||||
List<EventDetail> eventDetailList = influxDBResultMapper.toPOJO(result, EventDetail.class);
|
||||
long endTime = System.currentTimeMillis();
|
||||
System.out.println(eventDetailList);
|
||||
return result;
|
||||
}
|
||||
|
||||
//处理结果集
|
||||
public static void chanelResult(QueryResult result) {
|
||||
QueryResult.Result result1 = result.getResults().get(0);
|
||||
if (result1.getSeries() != null) {
|
||||
List<List<Object>> valueList = result1.getSeries().stream().map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
|
||||
if (valueList != null && valueList.size() > 0) {
|
||||
for (List<Object> value : valueList) {
|
||||
Map<String, String> map = new HashMap<String, String>();
|
||||
// 数据库中字段1取值
|
||||
String field1 = value.get(0) == null ? null : value.get(0).toString();
|
||||
System.out.println(field1);
|
||||
// 数据库中字段2取值
|
||||
String field2 = value.get(1) == null ? null : value.get(1).toString();
|
||||
System.out.println(field2);
|
||||
// TODO 用取出的字段做你自己的业务逻辑……
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
InfluxDbUtils influxDBUtil = new InfluxDbUtils("root", "123456789", "http://192.168.1.16:8086", "pqsbase", "");
|
||||
insert(influxDBUtil);
|
||||
//select(influxDBUtil);
|
||||
}
|
||||
public static void deleteDB(InfluxDbUtils influxDBUtil) {
|
||||
influxDBUtil.deleteDB("LIMIT_RATE");
|
||||
}
|
||||
//单条数据插入
|
||||
public static void insert(InfluxDbUtils influxDBUtil) {
|
||||
// long time = Long.parseLong("1655135328135");
|
||||
// Map<String, String> tags = new HashMap<>();
|
||||
// // tags.put("line_id", "127fad1dcb0077ac2979141b8473a5e4");
|
||||
// tags.put("line_id", "1883a1fe6d9c3c5a03a4371bda024452");
|
||||
// Map<String, Object> fields = new HashMap<>();
|
||||
// fields.put("create_time", "2022-06-13 23:57:31");
|
||||
// fields.put("update_time", "");
|
||||
// fields.put("push_failed",0);
|
||||
// fields.put("result",1);
|
||||
// influxDBUtil.insert("pqs_event_push_logs", tags, fields, time, TimeUnit.MILLISECONDS);
|
||||
|
||||
long time = Long.parseLong("1646180719000");
|
||||
Map<String, String> tags = new HashMap<>();
|
||||
tags.put("LineID", "fd4ffb0dd33eafaaf403b07a3fc1afe5");
|
||||
Map<String, Object> fields = new HashMap<>();
|
||||
fields.put("VU_Dev1",5.706);
|
||||
fields.put("VU_Dev2",5.706);
|
||||
fields.put("VU_Dev3",5.706);
|
||||
fields.put("VU_Dev4",5.706);
|
||||
fields.put("VU_Dev5",5.706);
|
||||
fields.put("Freq_Dev1",0.534);
|
||||
fields.put("Freq_Dev2",0.534);
|
||||
fields.put("Freq_Dev3",2.534);
|
||||
fields.put("Freq_Dev4",0.534);
|
||||
fields.put("Freq_Dev5",0.534);
|
||||
fields.put("Data_PST1",0.604);
|
||||
fields.put("Data_PST2",0.0);
|
||||
fields.put("Data_PST3",0.691);
|
||||
fields.put("Data_PST4",0.910);
|
||||
fields.put("Data_PST5",0.691);
|
||||
fields.put("V_Unbalance1",2.713);
|
||||
fields.put("V_Unbalance2",2.713);
|
||||
fields.put("V_Unbalance3",2.713);
|
||||
fields.put("V_Unbalance4",2.713);
|
||||
fields.put("V_Unbalance5",2.713);
|
||||
fields.put("V_THD1",20.001);
|
||||
fields.put("V_THD2",20.003);
|
||||
fields.put("V_THD3",20.00);
|
||||
fields.put("V_THD4",20.008);
|
||||
fields.put("V_THD5",20.00);
|
||||
fields.put("Event1",1.619);
|
||||
fields.put("Event2",1.619);
|
||||
fields.put("Event3",1.619);
|
||||
fields.put("Event4",1.619);
|
||||
fields.put("Event5",1.619);
|
||||
influxDBUtil.insert("PQS_COMASSES", tags, fields, time, TimeUnit.MILLISECONDS);
|
||||
// long time = Long.parseLong("1647473742000");
|
||||
// tags.put("MYINDEX", "df1ff413949f6d1fc07ffdb5440b4907");
|
||||
// Map<String, Object> fields = new HashMap<>();
|
||||
// fields.put("Phasic_Type","T");
|
||||
// fields.put("AllTime",1155);
|
||||
// fields.put("Flicker_AllTime",550);
|
||||
// fields.put("Flicker_OverTime",0);
|
||||
// fields.put("Freq_Dev_OverTime",0);
|
||||
// fields.put("Voltage_Dev_OverTime",0);
|
||||
// fields.put("UBalance_OverTime",0);
|
||||
// fields.put("UAberrance_OverTime",0);
|
||||
// fields.put("I_Neg_OverTime",0);
|
||||
// fields.put("UHarm_2_OverTime",0);
|
||||
// fields.put("UHarm_3_OverTime",0);
|
||||
// fields.put("UHarm_4_OverTime",0);
|
||||
// fields.put("UHarm_5_OverTime",0);
|
||||
// fields.put("UHarm_6_OverTime",0);
|
||||
// fields.put("UHarm_7_OverTime",0);
|
||||
// fields.put("UHarm_8_OverTime",0);
|
||||
// fields.put("UHarm_9_OverTime",0);
|
||||
// fields.put("UHarm_10_OverTime",0);
|
||||
// fields.put("UHarm_11_OverTime",0);
|
||||
// fields.put("UHarm_12_OverTime",0);
|
||||
// fields.put("UHarm_13_OverTime",0);
|
||||
// fields.put("UHarm_14_OverTime",0);
|
||||
// fields.put("UHarm_15_OverTime",0);
|
||||
// fields.put("UHarm_16_OverTime",0);
|
||||
// fields.put("UHarm_17_OverTime",0);
|
||||
// fields.put("UHarm_18_OverTime",0);
|
||||
// fields.put("UHarm_19_OverTime",0);
|
||||
// fields.put("UHarm_20_OverTime",0);
|
||||
// fields.put("UHarm_21_OverTime",0);
|
||||
// fields.put("UHarm_22_OverTime",0);
|
||||
// fields.put("UHarm_23_OverTime",0);
|
||||
// fields.put("UHarm_24_OverTime",0);
|
||||
// fields.put("UHarm_25_OverTime",0);
|
||||
// fields.put("IHarm_2_OverTime",0);
|
||||
// fields.put("IHarm_3_OverTime",0);
|
||||
// fields.put("IHarm_4_OverTime",0);
|
||||
// fields.put("IHarm_5_OverTime",0);
|
||||
// fields.put("IHarm_6_OverTime",0);
|
||||
// fields.put("IHarm_7_OverTime",0);
|
||||
// fields.put("IHarm_8_OverTime",0);
|
||||
// fields.put("IHarm_9_OverTime",0);
|
||||
// fields.put("IHarm_10_OverTime",0);
|
||||
// fields.put("IHarm_11_OverTime",0);
|
||||
// fields.put("IHarm_12_OverTime",0);
|
||||
// fields.put("IHarm_13_OverTime",0);
|
||||
// fields.put("IHarm_14_OverTime",0);
|
||||
// fields.put("IHarm_15_OverTime",0);
|
||||
// fields.put("IHarm_16_OverTime",0);
|
||||
// fields.put("IHarm_17_OverTime",0);
|
||||
// fields.put("IHarm_18_OverTime",0);
|
||||
// fields.put("IHarm_19_OverTime",0);
|
||||
// fields.put("IHarm_20_OverTime",0);
|
||||
// fields.put("IHarm_21_OverTime",0);
|
||||
// fields.put("IHarm_22_OverTime",0);
|
||||
// fields.put("IHarm_23_OverTime",0);
|
||||
// fields.put("IHarm_24_OverTime",0);
|
||||
// fields.put("IHarm_25_OverTime",0);
|
||||
// fields.put("InUHARM_1_OverTime",0);
|
||||
// fields.put("InUHARM_2_OverTime",0);
|
||||
// fields.put("InUHARM_3_OverTime",0);
|
||||
// fields.put("InUHARM_4_OverTime",0);
|
||||
// fields.put("InUHARM_5_OverTime",0);
|
||||
// fields.put("InUHARM_6_OverTime",0);
|
||||
// fields.put("InUHARM_7_OverTime",0);
|
||||
// fields.put("InUHARM_8_OverTime",0);
|
||||
// fields.put("InUHARM_9_OverTime",0);
|
||||
// fields.put("InUHARM_10_OverTime",0);
|
||||
// fields.put("InUHARM_11_OverTime",0);
|
||||
// fields.put("InUHARM_12_OverTime",0);
|
||||
// fields.put("InUHARM_13_OverTime",0);
|
||||
// fields.put("InUHARM_14_OverTime",0);
|
||||
// fields.put("InUHARM_15_OverTime",0);
|
||||
// fields.put("InUHARM_16_OverTime",0);
|
||||
// influxDBUtil.insert("LIMIT_RATE", tags, fields, time, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
||||
//循环写入数据库
|
||||
public static void batchInsertOne(InfluxDbUtils influxDBUtil) {
|
||||
Map<String, String> tags1 = new HashMap<>();
|
||||
tags1.put("line_id", "127fad1dcb0077ac2979141b8473a5e4");
|
||||
Map<String, Object> fields1 = new HashMap<>();
|
||||
fields1.put("describe", "暂降事件1");
|
||||
fields1.put("wave_type", 1);
|
||||
fields1.put("persist_time", 1620);
|
||||
fields1.put("event_value", 0.956);
|
||||
Map<String, String> tags2 = new HashMap<>();
|
||||
tags2.put("LineID", "9");
|
||||
tags2.put("Phasic_Type", "A");
|
||||
Map<String, Object> fields2 = new HashMap<>();
|
||||
fields2.put("RMS", 4);
|
||||
fields2.put("RMS_AB", 4);
|
||||
fields2.put("RMS_BC", 4);
|
||||
fields2.put("RMS_CA", 4);
|
||||
// 一条记录值
|
||||
Point point1 = influxDBUtil.pointBuilder("test", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
|
||||
Point point2 = influxDBUtil.pointBuilder("test", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
|
||||
// 将两条记录添加到batchPoints中
|
||||
BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "8").tag("Phasic_Type", "A").retentionPolicy("")
|
||||
.consistency(ConsistencyLevel.ALL).build();
|
||||
BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "9").tag("Phasic_Type", "A").retentionPolicy("")
|
||||
.consistency(ConsistencyLevel.ALL).build();
|
||||
batchPoints1.point(point1);
|
||||
batchPoints2.point(point2);
|
||||
// 将两条数据批量插入到数据库中
|
||||
influxDBUtil.batchInsert(batchPoints1, TimeUnit.MILLISECONDS);
|
||||
influxDBUtil.batchInsert(batchPoints2, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
||||
//批量插入数据
|
||||
public static void batchInsert(InfluxDbUtils influxDBUtil) {
|
||||
Map<String, String> tags1 = new HashMap<>();
|
||||
tags1.put("LineID", "4");
|
||||
tags1.put("Phasic_Type", "A");
|
||||
Map<String, Object> fields1 = new HashMap<>();
|
||||
fields1.put("RMS", 4.1111);
|
||||
fields1.put("RMS_AB", 4.1111);
|
||||
fields1.put("RMS_BC", 4.1111);
|
||||
fields1.put("RMS_CA", 4.1111);
|
||||
Map<String, String> tags2 = new HashMap<>();
|
||||
tags2.put("LineID", "5");
|
||||
tags2.put("Phasic_Type", "A");
|
||||
Map<String, Object> fields2 = new HashMap<>();
|
||||
fields2.put("RMS", 5.1111);
|
||||
fields2.put("RMS_AB", 5.1111);
|
||||
fields2.put("RMS_BC", 5.1111);
|
||||
fields2.put("RMS_CA", 5.1111);
|
||||
// 一条记录值。(注意:生产环境不要用System.currentTimeMillis(),因为数据量大会产生重复时间戳,导致数据丢失,要用数据自己的时间戳,这里只做演示)
|
||||
Point point1 = influxDBUtil.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
|
||||
Point point2 = influxDBUtil.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
|
||||
// BatchPoints batchPoints1 = BatchPoints.database("Data_v").tag("LineID", "4").tag("Phasic_Type","A").retentionPolicy("").consistency(ConsistencyLevel.ALL).precision(TimeUnit.MILLISECONDS).build();
|
||||
BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "4").tag("Phasic_Type", "A").retentionPolicy("").consistency(ConsistencyLevel.ALL).build();
|
||||
batchPoints1.point(point1);
|
||||
BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "5").tag("Phasic_Type", "A").retentionPolicy("").consistency(ConsistencyLevel.ALL).build();
|
||||
// 将两条记录添加到batchPoints中
|
||||
batchPoints2.point(point2);
|
||||
// 将不同的batchPoints序列化后,一次性写入数据库,提高写入速度
|
||||
List<String> records = new ArrayList<String>();
|
||||
records.add(batchPoints1.lineProtocol());
|
||||
records.add(batchPoints2.lineProtocol());
|
||||
// 将两条数据批量插入到数据库中
|
||||
influxDBUtil.batchInsert("test", "", ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user