379 lines
17 KiB
Java
379 lines
17 KiB
Java
import cn.hutool.core.date.DateUtil;
|
||
import com.njcn.common.utils.PubUtils;
|
||
import com.njcn.event.pojo.po.EventDetail;
|
||
import com.njcn.influxdb.utils.InfluxDbUtils;
|
||
import org.influxdb.InfluxDB.ConsistencyLevel;
|
||
import org.influxdb.dto.BatchPoints;
|
||
import org.influxdb.dto.Point;
|
||
import org.influxdb.dto.QueryResult;
|
||
import org.influxdb.impl.InfluxDBResultMapper;
|
||
|
||
import java.time.Instant;
|
||
import java.util.*;
|
||
import java.util.concurrent.TimeUnit;
|
||
import java.util.stream.Collectors;
|
||
|
||
/**
|
||
* 类的介绍:
|
||
*
|
||
* @author xuyang
|
||
* @version 1.0.0
|
||
* @createTime 2021/11/16 11:07
|
||
*/
|
||
|
||
public class DataTest {
|
||
|
||
//查询
|
||
public static QueryResult select(InfluxDbUtils influxDBUtil) {
|
||
long startTime = System.currentTimeMillis();
|
||
//组装sql语句
|
||
StringBuilder stringBuilder = new StringBuilder();
|
||
stringBuilder.append("time >= '").append(DateUtil.beginOfDay(DateUtil.parse("2022-05-06"))).append("' and ").append("time <= '").append(DateUtil.endOfDay(DateUtil.parse("2022-05-06"))).append("' and (");
|
||
//sql语句
|
||
stringBuilder.append("line_id ='").append("1e3b8531483b2a8cbee6747f1f641cf9").append("')");
|
||
//获取暂降事件
|
||
QueryResult result = influxDBUtil.query("select * from pqs_eventdetail where " + stringBuilder.toString());
|
||
InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper();
|
||
List<EventDetail> eventDetailList = influxDBResultMapper.toPOJO(result, EventDetail.class);
|
||
long endTime = System.currentTimeMillis();
|
||
System.out.println(eventDetailList);
|
||
return result;
|
||
}
|
||
|
||
//处理结果集
|
||
public static void chanelResult(QueryResult result) {
|
||
QueryResult.Result result1 = result.getResults().get(0);
|
||
if (result1.getSeries() != null) {
|
||
List<List<Object>> valueList = result1.getSeries().stream().map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
|
||
if (valueList != null && valueList.size() > 0) {
|
||
for (List<Object> value : valueList) {
|
||
Map<String, String> map = new HashMap<String, String>();
|
||
// 数据库中字段1取值
|
||
String field1 = value.get(0) == null ? null : value.get(0).toString();
|
||
System.out.println(field1);
|
||
// 数据库中字段2取值
|
||
String field2 = value.get(1) == null ? null : value.get(1).toString();
|
||
System.out.println(field2);
|
||
// TODO 用取出的字段做你自己的业务逻辑……
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public static void main(String[] args) {
|
||
InfluxDbUtils influxDBUtil = new InfluxDbUtils("admin", "123456", "http://192.168.1.18:8086", "pqsbase_sjzx", "");
|
||
|
||
insert(influxDBUtil);
|
||
}
|
||
|
||
public static void deleteDB(InfluxDbUtils influxDBUtil) {
|
||
influxDBUtil.deleteDB("LIMIT_RATE");
|
||
}
|
||
|
||
//单条数据插入
|
||
public static void insert(InfluxDbUtils influxDBUtil) {
|
||
Map<String, String> tags = new HashMap<>();
|
||
long time = Long.parseLong("1675958400000");
|
||
tags.put("dev_id", "57d121d45a26f3cc1d7b6ba541f895c0");
|
||
Map<String, Object> fields = new HashMap<>();
|
||
// fields.put("due",1440);
|
||
// fields.put("real",1200);
|
||
fields.put("online_min", 0);
|
||
fields.put("offline_min", 1440);
|
||
fields.put("online_rate", 0.0000);
|
||
influxDBUtil.insert("pqs_onlinerate", tags, fields, time, TimeUnit.MILLISECONDS);
|
||
// long time = Long.parseLong("1655135328135");
|
||
// Map<String, String> tags = new HashMap<>();
|
||
// // tags.put("line_id", "127fad1dcb0077ac2979141b8473a5e4");
|
||
// tags.put("line_id", "1883a1fe6d9c3c5a03a4371bda024452");
|
||
// Map<String, Object> fields = new HashMap<>();
|
||
// fields.put("create_time", "2022-06-13 23:57:31");
|
||
// fields.put("update_time", "");
|
||
// fields.put("push_failed",0);
|
||
// fields.put("result",1);
|
||
// influxDBUtil.insert("pqs_event_push_logs", tags, fields, time, TimeUnit.MILLISECONDS);
|
||
|
||
// long time = Long.parseLong("1654141002000");
|
||
// tags.put("line_id", "5e467a40023b299070682eb21f2ec9a1");
|
||
// Map<String, Object> fields = new HashMap<>();
|
||
// fields.put("vu_dev1",5.706);
|
||
// fields.put("vu_dev2",5.706);
|
||
// fields.put("vu_dev3",5.706);
|
||
// fields.put("vu_dev4",5.706);
|
||
// fields.put("vu_dev5",5.706);
|
||
// fields.put("freq_dev1",0.534);
|
||
// fields.put("freq_dev2",0.534);
|
||
// fields.put("freq_dev3",2.534);
|
||
// fields.put("freq_dev4",0.534);
|
||
// fields.put("freq_dev5",0.534);
|
||
// fields.put("data_plt1",0.604);
|
||
// fields.put("data_plt2",0.0);
|
||
// fields.put("data_plt3",0.691);
|
||
// fields.put("data_plt4",0.910);
|
||
// fields.put("data_plt5",0.691);
|
||
// fields.put("v_unbalance1",2.713);
|
||
// fields.put("v_unbalance2",2.713);
|
||
// fields.put("v_unbalance3",2.713);
|
||
// fields.put("v_unbalance4",2.713);
|
||
// fields.put("v_unbalance5",2.713);
|
||
// fields.put("v_thd1",20.001);
|
||
// fields.put("v_thd2",20.003);
|
||
// fields.put("v_thd3",20.00);
|
||
// fields.put("v_thd4",20.008);
|
||
// fields.put("v_thd5",20.00);
|
||
// fields.put("event1",1.619);
|
||
// fields.put("event2",1.619);
|
||
// fields.put("event3",1.619);
|
||
// fields.put("event4",1.619);
|
||
// fields.put("event5",1.619);
|
||
// influxDBUtil.insert("pqs_comasses", tags, fields, time, TimeUnit.MILLISECONDS);
|
||
// long time = Long.parseLong("1654141002000");
|
||
// tags.put("line_id", "5e467a40023b299070682eb21f2ec9a1");
|
||
// tags.put("phasic_type","C");
|
||
// Map<String, Object> fields = new HashMap<>();
|
||
// fields.put("alltime",1155);
|
||
// fields.put("flicker_alltime",550);
|
||
// fields.put("flicker_overtime",0);
|
||
// fields.put("freq_dev_overtime",0);
|
||
// fields.put("voltage_dev_overtime",0);
|
||
// fields.put("ubalance_overtime",0);
|
||
// fields.put("uaberrance_overtime",0);
|
||
// fields.put("i_neg_overtime",0);
|
||
// fields.put("uharm_2_overtime",0);
|
||
// fields.put("uharm_3_overtime",0);
|
||
// fields.put("uharm_4_overtime",0);
|
||
// fields.put("uharm_5_overtime",0);
|
||
// fields.put("uharm_6_overtime",0);
|
||
// fields.put("uharm_7_overtime",0);
|
||
// fields.put("uharm_8_overtime",0);
|
||
// fields.put("uharm_9_overtime",0);
|
||
// fields.put("uharm_10_overtime",0);
|
||
// fields.put("uharm_11_overtime",0);
|
||
// fields.put("uharm_12_overtime",0);
|
||
// fields.put("uharm_13_overtime",0);
|
||
// fields.put("uharm_14_overtime",0);
|
||
// fields.put("uharm_15_overtime",0);
|
||
// fields.put("uharm_16_overtime",0);
|
||
// fields.put("uharm_17_overtime",0);
|
||
// fields.put("uharm_18_overtime",0);
|
||
// fields.put("uharm_19_overtime",0);
|
||
// fields.put("uharm_20_overtime",0);
|
||
// fields.put("uharm_21_overtime",0);
|
||
// fields.put("uharm_22_overtime",0);
|
||
// fields.put("uharm_23_overtime",0);
|
||
// fields.put("uharm_24_overtime",0);
|
||
// fields.put("uharm_25_overtime",0);
|
||
// fields.put("iharm_2_overtime",0);
|
||
// fields.put("iharm_3_overtime",0);
|
||
// fields.put("iharm_4_overtime",0);
|
||
// fields.put("iharm_5_overtime",0);
|
||
// fields.put("iharm_6_overtime",0);
|
||
// fields.put("iharm_7_overtime",0);
|
||
// fields.put("iharm_8_overtime",0);
|
||
// fields.put("iharm_9_overtime",0);
|
||
// fields.put("iharm_10_overtime",0);
|
||
// fields.put("iharm_11_overtime",0);
|
||
// fields.put("iharm_12_overtime",0);
|
||
// fields.put("iharm_13_overtime",0);
|
||
// fields.put("iharm_14_overtime",0);
|
||
// fields.put("iharm_15_overtime",0);
|
||
// fields.put("iharm_16_overtime",0);
|
||
// fields.put("iharm_17_overtime",0);
|
||
// fields.put("iharm_18_overtime",0);
|
||
// fields.put("iharm_19_overtime",0);
|
||
// fields.put("iharm_20_overtime",0);
|
||
// fields.put("iharm_21_overtime",0);
|
||
// fields.put("iharm_22_overtime",0);
|
||
// fields.put("iharm_23_overtime",0);
|
||
// fields.put("iharm_24_overtime",0);
|
||
// fields.put("iharm_25_overtime",0);
|
||
// fields.put("inuharm_1_overtime",0);
|
||
// fields.put("inuharm_2_overtime",0);
|
||
// fields.put("inuharm_3_overtime",0);
|
||
// fields.put("inuharm_4_overtime",0);
|
||
// fields.put("inuharm_5_overtime",0);
|
||
// fields.put("inuharm_6_overtime",0);
|
||
// fields.put("inuharm_7_overtime",0);
|
||
// fields.put("inuharm_8_overtime",0);
|
||
// fields.put("inuharm_9_overtime",0);
|
||
// fields.put("inuharm_10_overtime",0);
|
||
// fields.put("inuharm_11_overtime",0);
|
||
// fields.put("inuharm_12_overtime",0);
|
||
// fields.put("inuharm_13_overtime",0);
|
||
// fields.put("inuharm_14_overtime",0);
|
||
// fields.put("inuharm_15_overtime",0);
|
||
// fields.put("inuharm_16_overtime",0);
|
||
// influxDBUtil.insert("limit_rate", tags, fields, time, TimeUnit.MILLISECONDS);
|
||
|
||
// long time = Long.parseLong("1654141002000");
|
||
// tags.put("line_id", "5e467a40023b299070682eb21f2ec9a1");
|
||
// tags.put("phasic_type","A");
|
||
// tags.put("value_type","CP95");
|
||
// Map<String, Object> fields = new HashMap<>();
|
||
// fields.put("voltage_dev",3.6);
|
||
// fields.put("uvoltage_dev",-2.6);
|
||
// fields.put("ubalance",6);
|
||
// fields.put("flicker",0.6);
|
||
// fields.put("uaberrance",2);
|
||
// fields.put("i_neg",20);
|
||
// fields.put("uharm_2",0);
|
||
// fields.put("uharm_3",0);
|
||
// fields.put("uharm_4",0);
|
||
// fields.put("uharm_5",0);
|
||
// fields.put("uharm_6",0);
|
||
// fields.put("uharm_7",0);
|
||
// fields.put("uharm_8",0);
|
||
// fields.put("uharm_9",0);
|
||
// fields.put("uharm_10",0);
|
||
// fields.put("uharm_11",10);
|
||
// fields.put("uharm_12",0);
|
||
// fields.put("uharm_13",0);
|
||
// fields.put("uharm_14",0);
|
||
// fields.put("uharm_15",0);
|
||
// fields.put("uharm_16",15.3);
|
||
// fields.put("uharm_17",0);
|
||
// fields.put("uharm_18",0);
|
||
// fields.put("uharm_19",0);
|
||
// fields.put("uharm_20",0);
|
||
// fields.put("uharm_21",0);
|
||
// fields.put("uharm_22",0);
|
||
// fields.put("uharm_23",0);
|
||
// fields.put("uharm_24",0);
|
||
// fields.put("uharm_25",0);
|
||
// fields.put("iharm_2",0);
|
||
// fields.put("iharm_3",0);
|
||
// fields.put("iharm_4",0);
|
||
// fields.put("iharm_5",6.02);
|
||
// fields.put("iharm_6",0);
|
||
// fields.put("iharm_7",0);
|
||
// fields.put("iharm_8",0);
|
||
// fields.put("iharm_9",0);
|
||
// fields.put("iharm_10",0);
|
||
// fields.put("iharm_11",0);
|
||
// fields.put("iharm_12",0);
|
||
// fields.put("iharm_13",0);
|
||
// fields.put("iharm_14",0);
|
||
// fields.put("iharm_15",3.25);
|
||
// fields.put("iharm_16",0);
|
||
// fields.put("iharm_17",0);
|
||
// fields.put("iharm_18",0);
|
||
// fields.put("iharm_19",0);
|
||
// fields.put("iharm_20",0);
|
||
// fields.put("iharm_21",0);
|
||
// fields.put("iharm_22",0);
|
||
// fields.put("iharm_23",0);
|
||
// fields.put("iharm_24",3.52);
|
||
// fields.put("iharm_25",0);
|
||
// fields.put("inuharm_1",0);
|
||
// fields.put("inuharm_2",0);
|
||
// fields.put("inuharm_3",3.25);
|
||
// fields.put("inuharm_4",0);
|
||
// fields.put("inuharm_5",3.26);
|
||
// fields.put("inuharm_6",0);
|
||
// fields.put("inuharm_7",0);
|
||
// fields.put("inuharm_8",0);
|
||
// fields.put("inuharm_9",0);
|
||
// fields.put("inuharm_10",0);
|
||
// fields.put("inuharm_11",0);
|
||
// fields.put("inuharm_12",6.25);
|
||
// fields.put("inuharm_13",0);
|
||
// fields.put("inuharm_14",0);
|
||
// fields.put("inuharm_15",0);
|
||
// fields.put("inuharm_16",0);
|
||
// influxDBUtil.insert("pqs_abnormaldata", tags, fields, time, TimeUnit.MILLISECONDS);
|
||
}
|
||
|
||
|
||
//循环写入数据库
|
||
public static void batchInsertOne(InfluxDbUtils influxDBUtil) {
|
||
Map<String, String> tags1 = new HashMap<>();
|
||
tags1.put("line_id", "127fad1dcb0077ac2979141b8473a5e4");
|
||
Map<String, Object> fields1 = new HashMap<>();
|
||
fields1.put("describe", "暂降事件1");
|
||
fields1.put("wave_type", 1);
|
||
fields1.put("persist_time", 1620);
|
||
fields1.put("event_value", 0.956);
|
||
Map<String, String> tags2 = new HashMap<>();
|
||
tags2.put("LineID", "9");
|
||
tags2.put("Phasic_Type", "A");
|
||
Map<String, Object> fields2 = new HashMap<>();
|
||
fields2.put("RMS", 4);
|
||
fields2.put("RMS_AB", 4);
|
||
fields2.put("RMS_BC", 4);
|
||
fields2.put("RMS_CA", 4);
|
||
// 一条记录值
|
||
Point point1 = influxDBUtil.pointBuilder("test", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
|
||
Point point2 = influxDBUtil.pointBuilder("test", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
|
||
// 将两条记录添加到batchPoints中
|
||
BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "8").tag("Phasic_Type", "A").retentionPolicy("")
|
||
.consistency(ConsistencyLevel.ALL).build();
|
||
BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "9").tag("Phasic_Type", "A").retentionPolicy("")
|
||
.consistency(ConsistencyLevel.ALL).build();
|
||
batchPoints1.point(point1);
|
||
batchPoints2.point(point2);
|
||
// 将两条数据批量插入到数据库中
|
||
influxDBUtil.batchInsert(batchPoints1, TimeUnit.MILLISECONDS);
|
||
influxDBUtil.batchInsert(batchPoints2, TimeUnit.MILLISECONDS);
|
||
}
|
||
|
||
|
||
//批量插入数据
|
||
public static void batchInsert(InfluxDbUtils influxDBUtil) {
|
||
Map<String, String> tags1 = new HashMap<>();
|
||
tags1.put("LineID", "4");
|
||
tags1.put("Phasic_Type", "A");
|
||
Map<String, Object> fields1 = new HashMap<>();
|
||
fields1.put("RMS", 4.1111);
|
||
fields1.put("RMS_AB", 4.1111);
|
||
fields1.put("RMS_BC", 4.1111);
|
||
fields1.put("RMS_CA", 4.1111);
|
||
Map<String, String> tags2 = new HashMap<>();
|
||
tags2.put("LineID", "5");
|
||
tags2.put("Phasic_Type", "A");
|
||
Map<String, Object> fields2 = new HashMap<>();
|
||
fields2.put("RMS", 5.1111);
|
||
fields2.put("RMS_AB", 5.1111);
|
||
fields2.put("RMS_BC", 5.1111);
|
||
fields2.put("RMS_CA", 5.1111);
|
||
// 一条记录值。(注意:生产环境不要用System.currentTimeMillis(),因为数据量大会产生重复时间戳,导致数据丢失,要用数据自己的时间戳,这里只做演示)
|
||
Point point1 = influxDBUtil.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
|
||
Point point2 = influxDBUtil.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
|
||
// BatchPoints batchPoints1 = BatchPoints.database("Data_v").tag("LineID", "4").tag("Phasic_Type","A").retentionPolicy("").consistency(ConsistencyLevel.ALL).precision(TimeUnit.MILLISECONDS).build();
|
||
BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "4").tag("Phasic_Type", "A").retentionPolicy("").consistency(ConsistencyLevel.ALL).build();
|
||
batchPoints1.point(point1);
|
||
BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "5").tag("Phasic_Type", "A").retentionPolicy("").consistency(ConsistencyLevel.ALL).build();
|
||
// 将两条记录添加到batchPoints中
|
||
batchPoints2.point(point2);
|
||
// 将不同的batchPoints序列化后,一次性写入数据库,提高写入速度
|
||
List<String> records = new ArrayList<String>();
|
||
records.add(batchPoints1.lineProtocol());
|
||
records.add(batchPoints2.lineProtocol());
|
||
// 将两条数据批量插入到数据库中
|
||
influxDBUtil.batchInsert("test", "", ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
|
||
}
|
||
|
||
public static void batchInsertPqsCommunicate(InfluxDbUtils influxDBUtil) {
|
||
Map<String, String> tags = new HashMap<>();
|
||
tags.put("line_id", "025fa0e4c91f72ad7f1c1bd29026f20a");
|
||
|
||
|
||
Map<String, Object> fields = new HashMap<>();
|
||
fields.put("description", "在线");
|
||
fields.put("type", 1);
|
||
|
||
influxDBUtil.insert("pqs_communicate", tags, fields, System.currentTimeMillis()-170000000, TimeUnit.MILLISECONDS);
|
||
|
||
Map<String, String> tags1 = new HashMap<>();
|
||
tags1.put("line_id", "025fa0e4c91f72ad7f1c1bd29026f20a");
|
||
|
||
|
||
Map<String, Object> fields1 = new HashMap<>();
|
||
fields1.put("description", "掉线");
|
||
fields1.put("type", 0);
|
||
|
||
influxDBUtil.insert("pqs_communicate", tags1, fields1, System.currentTimeMillis()-70000000, TimeUnit.MILLISECONDS);
|
||
}
|
||
|
||
|
||
}
|