2022-06-22 09:14:52 +08:00
import cn.hutool.core.date.DateUtil ;
2022-09-29 19:43:14 +08:00
import com.njcn.common.utils.PubUtils ;
2022-06-22 09:14:52 +08:00
import com.njcn.event.pojo.po.EventDetail ;
2022-06-21 20:47:46 +08:00
import com.njcn.influxdb.utils.InfluxDbUtils ;
import org.influxdb.InfluxDB.ConsistencyLevel ;
import org.influxdb.dto.BatchPoints ;
import org.influxdb.dto.Point ;
import org.influxdb.dto.QueryResult ;
2022-06-22 09:14:52 +08:00
import org.influxdb.impl.InfluxDBResultMapper ;
2022-06-21 20:47:46 +08:00
2022-09-29 19:43:14 +08:00
import java.time.Instant ;
2022-06-21 20:47:46 +08:00
import java.util.* ;
import java.util.concurrent.TimeUnit ;
import java.util.stream.Collectors ;
/ * *
* 类的介绍 :
*
* @author xuyang
* @version 1 . 0 . 0
* @createTime 2021 / 11 / 16 11 : 07
* /
2022-06-22 09:55:31 +08:00
public class DataTest {
2022-06-21 20:47:46 +08:00
//查询
public static QueryResult select ( InfluxDbUtils influxDBUtil ) {
long startTime = System . currentTimeMillis ( ) ;
2022-06-22 09:14:52 +08:00
//组装sql语句
StringBuilder stringBuilder = new StringBuilder ( ) ;
stringBuilder . append ( " time >= ' " ) . append ( DateUtil . beginOfDay ( DateUtil . parse ( " 2022-05-06 " ) ) ) . append ( " ' and " ) . append ( " time <= ' " ) . append ( DateUtil . endOfDay ( DateUtil . parse ( " 2022-05-06 " ) ) ) . append ( " ' and ( " ) ;
//sql语句
stringBuilder . append ( " line_id =' " ) . append ( " 1e3b8531483b2a8cbee6747f1f641cf9 " ) . append ( " ') " ) ;
//获取暂降事件
QueryResult result = influxDBUtil . query ( " select * from pqs_eventdetail where " + stringBuilder . toString ( ) ) ;
InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper ( ) ;
List < EventDetail > eventDetailList = influxDBResultMapper . toPOJO ( result , EventDetail . class ) ;
2022-06-21 20:47:46 +08:00
long endTime = System . currentTimeMillis ( ) ;
2022-06-22 09:14:52 +08:00
System . out . println ( eventDetailList ) ;
2022-06-21 20:47:46 +08:00
return result ;
}
//处理结果集
public static void chanelResult ( QueryResult result ) {
QueryResult . Result result1 = result . getResults ( ) . get ( 0 ) ;
if ( result1 . getSeries ( ) ! = null ) {
List < List < Object > > valueList = result1 . getSeries ( ) . stream ( ) . map ( QueryResult . Series : : getValues ) . collect ( Collectors . toList ( ) ) . get ( 0 ) ;
if ( valueList ! = null & & valueList . size ( ) > 0 ) {
for ( List < Object > value : valueList ) {
Map < String , String > map = new HashMap < String , String > ( ) ;
// 数据库中字段1取值
String field1 = value . get ( 0 ) = = null ? null : value . get ( 0 ) . toString ( ) ;
System . out . println ( field1 ) ;
// 数据库中字段2取值
String field2 = value . get ( 1 ) = = null ? null : value . get ( 1 ) . toString ( ) ;
System . out . println ( field2 ) ;
// TODO 用取出的字段做你自己的业务逻辑……
}
}
}
}
2022-06-22 09:55:31 +08:00
public static void main ( String [ ] args ) {
2023-02-10 16:40:43 +08:00
InfluxDbUtils influxDBUtil = new InfluxDbUtils ( " admin " , " 123456 " , " http://192.168.1.18:8086 " , " pqsbase_sjzx " , " " ) ;
2022-09-29 19:43:14 +08:00
2023-02-10 16:40:43 +08:00
insert ( influxDBUtil ) ;
2022-06-22 09:14:52 +08:00
}
2022-09-29 19:43:14 +08:00
2022-06-22 09:14:52 +08:00
public static void deleteDB ( InfluxDbUtils influxDBUtil ) {
influxDBUtil . deleteDB ( " LIMIT_RATE " ) ;
2022-06-21 20:47:46 +08:00
}
2022-09-29 19:43:14 +08:00
2022-06-21 20:47:46 +08:00
//单条数据插入
public static void insert ( InfluxDbUtils influxDBUtil ) {
2022-06-30 09:52:58 +08:00
Map < String , String > tags = new HashMap < > ( ) ;
2023-02-10 16:40:43 +08:00
long time = Long . parseLong ( " 1675958400000 " ) ;
tags . put ( " dev_id " , " 57d121d45a26f3cc1d7b6ba541f895c0 " ) ;
2022-06-30 09:52:58 +08:00
Map < String , Object > fields = new HashMap < > ( ) ;
2022-07-07 16:18:23 +08:00
// fields.put("due",1440);
// fields.put("real",1200);
2023-02-10 16:40:43 +08:00
fields . put ( " online_min " , 0 ) ;
fields . put ( " offline_min " , 1440 ) ;
fields . put ( " online_rate " , 0 . 0000 ) ;
2022-07-07 16:18:23 +08:00
influxDBUtil . insert ( " pqs_onlinerate " , tags , fields , time , TimeUnit . MILLISECONDS ) ;
2022-06-22 09:14:52 +08:00
// long time = Long.parseLong("1655135328135");
// Map<String, String> tags = new HashMap<>();
// // tags.put("line_id", "127fad1dcb0077ac2979141b8473a5e4");
// tags.put("line_id", "1883a1fe6d9c3c5a03a4371bda024452");
// Map<String, Object> fields = new HashMap<>();
// fields.put("create_time", "2022-06-13 23:57:31");
// fields.put("update_time", "");
// fields.put("push_failed",0);
// fields.put("result",1);
// influxDBUtil.insert("pqs_event_push_logs", tags, fields, time, TimeUnit.MILLISECONDS);
2022-06-28 21:08:17 +08:00
// long time = Long.parseLong("1654141002000");
// tags.put("line_id", "5e467a40023b299070682eb21f2ec9a1");
// Map<String, Object> fields = new HashMap<>();
// fields.put("vu_dev1",5.706);
// fields.put("vu_dev2",5.706);
// fields.put("vu_dev3",5.706);
// fields.put("vu_dev4",5.706);
// fields.put("vu_dev5",5.706);
// fields.put("freq_dev1",0.534);
// fields.put("freq_dev2",0.534);
// fields.put("freq_dev3",2.534);
// fields.put("freq_dev4",0.534);
// fields.put("freq_dev5",0.534);
// fields.put("data_plt1",0.604);
// fields.put("data_plt2",0.0);
// fields.put("data_plt3",0.691);
// fields.put("data_plt4",0.910);
// fields.put("data_plt5",0.691);
// fields.put("v_unbalance1",2.713);
// fields.put("v_unbalance2",2.713);
// fields.put("v_unbalance3",2.713);
// fields.put("v_unbalance4",2.713);
// fields.put("v_unbalance5",2.713);
// fields.put("v_thd1",20.001);
// fields.put("v_thd2",20.003);
// fields.put("v_thd3",20.00);
// fields.put("v_thd4",20.008);
// fields.put("v_thd5",20.00);
// fields.put("event1",1.619);
// fields.put("event2",1.619);
// fields.put("event3",1.619);
// fields.put("event4",1.619);
// fields.put("event5",1.619);
// influxDBUtil.insert("pqs_comasses", tags, fields, time, TimeUnit.MILLISECONDS);
// long time = Long.parseLong("1654141002000");
// tags.put("line_id", "5e467a40023b299070682eb21f2ec9a1");
// tags.put("phasic_type","C");
// Map<String, Object> fields = new HashMap<>();
// fields.put("alltime",1155);
// fields.put("flicker_alltime",550);
// fields.put("flicker_overtime",0);
// fields.put("freq_dev_overtime",0);
// fields.put("voltage_dev_overtime",0);
// fields.put("ubalance_overtime",0);
// fields.put("uaberrance_overtime",0);
// fields.put("i_neg_overtime",0);
// fields.put("uharm_2_overtime",0);
// fields.put("uharm_3_overtime",0);
// fields.put("uharm_4_overtime",0);
// fields.put("uharm_5_overtime",0);
// fields.put("uharm_6_overtime",0);
// fields.put("uharm_7_overtime",0);
// fields.put("uharm_8_overtime",0);
// fields.put("uharm_9_overtime",0);
// fields.put("uharm_10_overtime",0);
// fields.put("uharm_11_overtime",0);
// fields.put("uharm_12_overtime",0);
// fields.put("uharm_13_overtime",0);
// fields.put("uharm_14_overtime",0);
// fields.put("uharm_15_overtime",0);
// fields.put("uharm_16_overtime",0);
// fields.put("uharm_17_overtime",0);
// fields.put("uharm_18_overtime",0);
// fields.put("uharm_19_overtime",0);
// fields.put("uharm_20_overtime",0);
// fields.put("uharm_21_overtime",0);
// fields.put("uharm_22_overtime",0);
// fields.put("uharm_23_overtime",0);
// fields.put("uharm_24_overtime",0);
// fields.put("uharm_25_overtime",0);
// fields.put("iharm_2_overtime",0);
// fields.put("iharm_3_overtime",0);
// fields.put("iharm_4_overtime",0);
// fields.put("iharm_5_overtime",0);
// fields.put("iharm_6_overtime",0);
// fields.put("iharm_7_overtime",0);
// fields.put("iharm_8_overtime",0);
// fields.put("iharm_9_overtime",0);
// fields.put("iharm_10_overtime",0);
// fields.put("iharm_11_overtime",0);
// fields.put("iharm_12_overtime",0);
// fields.put("iharm_13_overtime",0);
// fields.put("iharm_14_overtime",0);
// fields.put("iharm_15_overtime",0);
// fields.put("iharm_16_overtime",0);
// fields.put("iharm_17_overtime",0);
// fields.put("iharm_18_overtime",0);
// fields.put("iharm_19_overtime",0);
// fields.put("iharm_20_overtime",0);
// fields.put("iharm_21_overtime",0);
// fields.put("iharm_22_overtime",0);
// fields.put("iharm_23_overtime",0);
// fields.put("iharm_24_overtime",0);
// fields.put("iharm_25_overtime",0);
// fields.put("inuharm_1_overtime",0);
// fields.put("inuharm_2_overtime",0);
// fields.put("inuharm_3_overtime",0);
// fields.put("inuharm_4_overtime",0);
// fields.put("inuharm_5_overtime",0);
// fields.put("inuharm_6_overtime",0);
// fields.put("inuharm_7_overtime",0);
// fields.put("inuharm_8_overtime",0);
// fields.put("inuharm_9_overtime",0);
// fields.put("inuharm_10_overtime",0);
// fields.put("inuharm_11_overtime",0);
// fields.put("inuharm_12_overtime",0);
// fields.put("inuharm_13_overtime",0);
// fields.put("inuharm_14_overtime",0);
// fields.put("inuharm_15_overtime",0);
// fields.put("inuharm_16_overtime",0);
// influxDBUtil.insert("limit_rate", tags, fields, time, TimeUnit.MILLISECONDS);
2022-06-30 09:52:58 +08:00
// long time = Long.parseLong("1654141002000");
// tags.put("line_id", "5e467a40023b299070682eb21f2ec9a1");
// tags.put("phasic_type","A");
// tags.put("value_type","CP95");
// Map<String, Object> fields = new HashMap<>();
// fields.put("voltage_dev",3.6);
// fields.put("uvoltage_dev",-2.6);
// fields.put("ubalance",6);
// fields.put("flicker",0.6);
// fields.put("uaberrance",2);
// fields.put("i_neg",20);
// fields.put("uharm_2",0);
// fields.put("uharm_3",0);
// fields.put("uharm_4",0);
// fields.put("uharm_5",0);
// fields.put("uharm_6",0);
// fields.put("uharm_7",0);
// fields.put("uharm_8",0);
// fields.put("uharm_9",0);
// fields.put("uharm_10",0);
// fields.put("uharm_11",10);
// fields.put("uharm_12",0);
// fields.put("uharm_13",0);
// fields.put("uharm_14",0);
// fields.put("uharm_15",0);
// fields.put("uharm_16",15.3);
// fields.put("uharm_17",0);
// fields.put("uharm_18",0);
// fields.put("uharm_19",0);
// fields.put("uharm_20",0);
// fields.put("uharm_21",0);
// fields.put("uharm_22",0);
// fields.put("uharm_23",0);
// fields.put("uharm_24",0);
// fields.put("uharm_25",0);
// fields.put("iharm_2",0);
// fields.put("iharm_3",0);
// fields.put("iharm_4",0);
// fields.put("iharm_5",6.02);
// fields.put("iharm_6",0);
// fields.put("iharm_7",0);
// fields.put("iharm_8",0);
// fields.put("iharm_9",0);
// fields.put("iharm_10",0);
// fields.put("iharm_11",0);
// fields.put("iharm_12",0);
// fields.put("iharm_13",0);
// fields.put("iharm_14",0);
// fields.put("iharm_15",3.25);
// fields.put("iharm_16",0);
// fields.put("iharm_17",0);
// fields.put("iharm_18",0);
// fields.put("iharm_19",0);
// fields.put("iharm_20",0);
// fields.put("iharm_21",0);
// fields.put("iharm_22",0);
// fields.put("iharm_23",0);
// fields.put("iharm_24",3.52);
// fields.put("iharm_25",0);
// fields.put("inuharm_1",0);
// fields.put("inuharm_2",0);
// fields.put("inuharm_3",3.25);
// fields.put("inuharm_4",0);
// fields.put("inuharm_5",3.26);
// fields.put("inuharm_6",0);
// fields.put("inuharm_7",0);
// fields.put("inuharm_8",0);
// fields.put("inuharm_9",0);
// fields.put("inuharm_10",0);
// fields.put("inuharm_11",0);
// fields.put("inuharm_12",6.25);
// fields.put("inuharm_13",0);
// fields.put("inuharm_14",0);
// fields.put("inuharm_15",0);
// fields.put("inuharm_16",0);
// influxDBUtil.insert("pqs_abnormaldata", tags, fields, time, TimeUnit.MILLISECONDS);
2022-06-21 20:47:46 +08:00
}
//循环写入数据库
public static void batchInsertOne ( InfluxDbUtils influxDBUtil ) {
Map < String , String > tags1 = new HashMap < > ( ) ;
2022-06-22 09:14:52 +08:00
tags1 . put ( " line_id " , " 127fad1dcb0077ac2979141b8473a5e4 " ) ;
2022-06-21 20:47:46 +08:00
Map < String , Object > fields1 = new HashMap < > ( ) ;
2022-06-22 09:14:52 +08:00
fields1 . put ( " describe " , " 暂降事件1 " ) ;
fields1 . put ( " wave_type " , 1 ) ;
fields1 . put ( " persist_time " , 1620 ) ;
fields1 . put ( " event_value " , 0 . 956 ) ;
2022-06-21 20:47:46 +08:00
Map < String , String > tags2 = new HashMap < > ( ) ;
tags2 . put ( " LineID " , " 9 " ) ;
tags2 . put ( " Phasic_Type " , " A " ) ;
Map < String , Object > fields2 = new HashMap < > ( ) ;
fields2 . put ( " RMS " , 4 ) ;
fields2 . put ( " RMS_AB " , 4 ) ;
fields2 . put ( " RMS_BC " , 4 ) ;
fields2 . put ( " RMS_CA " , 4 ) ;
// 一条记录值
Point point1 = influxDBUtil . pointBuilder ( " test " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags1 , fields1 ) ;
Point point2 = influxDBUtil . pointBuilder ( " test " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags2 , fields2 ) ;
// 将两条记录添加到batchPoints中
BatchPoints batchPoints1 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 8 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " )
. consistency ( ConsistencyLevel . ALL ) . build ( ) ;
BatchPoints batchPoints2 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 9 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " )
. consistency ( ConsistencyLevel . ALL ) . build ( ) ;
batchPoints1 . point ( point1 ) ;
batchPoints2 . point ( point2 ) ;
// 将两条数据批量插入到数据库中
influxDBUtil . batchInsert ( batchPoints1 , TimeUnit . MILLISECONDS ) ;
influxDBUtil . batchInsert ( batchPoints2 , TimeUnit . MILLISECONDS ) ;
}
//批量插入数据
public static void batchInsert ( InfluxDbUtils influxDBUtil ) {
Map < String , String > tags1 = new HashMap < > ( ) ;
tags1 . put ( " LineID " , " 4 " ) ;
tags1 . put ( " Phasic_Type " , " A " ) ;
Map < String , Object > fields1 = new HashMap < > ( ) ;
fields1 . put ( " RMS " , 4 . 1111 ) ;
fields1 . put ( " RMS_AB " , 4 . 1111 ) ;
fields1 . put ( " RMS_BC " , 4 . 1111 ) ;
fields1 . put ( " RMS_CA " , 4 . 1111 ) ;
Map < String , String > tags2 = new HashMap < > ( ) ;
tags2 . put ( " LineID " , " 5 " ) ;
tags2 . put ( " Phasic_Type " , " A " ) ;
Map < String , Object > fields2 = new HashMap < > ( ) ;
fields2 . put ( " RMS " , 5 . 1111 ) ;
fields2 . put ( " RMS_AB " , 5 . 1111 ) ;
fields2 . put ( " RMS_BC " , 5 . 1111 ) ;
fields2 . put ( " RMS_CA " , 5 . 1111 ) ;
// 一条记录值。( 注意: 生产环境不要用System.currentTimeMillis(),因为数据量大会产生重复时间戳,导致数据丢失,要用数据自己的时间戳,这里只做演示)
Point point1 = influxDBUtil . pointBuilder ( " Data_v " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags1 , fields1 ) ;
Point point2 = influxDBUtil . pointBuilder ( " Data_v " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags2 , fields2 ) ;
// BatchPoints batchPoints1 = BatchPoints.database("Data_v").tag("LineID", "4").tag("Phasic_Type","A").retentionPolicy("").consistency(ConsistencyLevel.ALL).precision(TimeUnit.MILLISECONDS).build();
BatchPoints batchPoints1 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 4 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " ) . consistency ( ConsistencyLevel . ALL ) . build ( ) ;
batchPoints1 . point ( point1 ) ;
BatchPoints batchPoints2 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 5 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " ) . consistency ( ConsistencyLevel . ALL ) . build ( ) ;
// 将两条记录添加到batchPoints中
batchPoints2 . point ( point2 ) ;
// 将不同的batchPoints序列化后, 一次性写入数据库, 提高写入速度
List < String > records = new ArrayList < String > ( ) ;
records . add ( batchPoints1 . lineProtocol ( ) ) ;
records . add ( batchPoints2 . lineProtocol ( ) ) ;
// 将两条数据批量插入到数据库中
influxDBUtil . batchInsert ( " test " , " " , ConsistencyLevel . ALL , TimeUnit . MILLISECONDS , records ) ;
}
2022-09-27 21:10:51 +08:00
public static void batchInsertPqsCommunicate ( InfluxDbUtils influxDBUtil ) {
Map < String , String > tags = new HashMap < > ( ) ;
tags . put ( " line_id " , " 025fa0e4c91f72ad7f1c1bd29026f20a " ) ;
Map < String , Object > fields = new HashMap < > ( ) ;
fields . put ( " description " , " 在线 " ) ;
fields . put ( " type " , 1 ) ;
influxDBUtil . insert ( " pqs_communicate " , tags , fields , System . currentTimeMillis ( ) - 170000000 , TimeUnit . MILLISECONDS ) ;
Map < String , String > tags1 = new HashMap < > ( ) ;
tags1 . put ( " line_id " , " 025fa0e4c91f72ad7f1c1bd29026f20a " ) ;
Map < String , Object > fields1 = new HashMap < > ( ) ;
fields1 . put ( " description " , " 掉线 " ) ;
fields1 . put ( " type " , 0 ) ;
influxDBUtil . insert ( " pqs_communicate " , tags1 , fields1 , System . currentTimeMillis ( ) - 70000000 , TimeUnit . MILLISECONDS ) ;
}
2022-06-21 20:47:46 +08:00
}