2022-06-22 09:14:52 +08:00
import cn.hutool.core.date.DateUtil ;
import com.njcn.event.pojo.po.EventDetail ;
2022-06-21 20:47:46 +08:00
import com.njcn.influxdb.utils.InfluxDbUtils ;
import org.influxdb.InfluxDB.ConsistencyLevel ;
import org.influxdb.dto.BatchPoints ;
import org.influxdb.dto.Point ;
import org.influxdb.dto.QueryResult ;
2022-06-22 09:14:52 +08:00
import org.influxdb.impl.InfluxDBResultMapper ;
2022-06-21 20:47:46 +08:00
import java.util.* ;
import java.util.concurrent.TimeUnit ;
import java.util.stream.Collectors ;
/ * *
* 类的介绍 :
*
* @author xuyang
* @version 1 . 0 . 0
* @createTime 2021 / 11 / 16 11 : 07
* /
2022-06-22 09:55:31 +08:00
public class DataTest {
2022-06-21 20:47:46 +08:00
//查询
public static QueryResult select ( InfluxDbUtils influxDBUtil ) {
long startTime = System . currentTimeMillis ( ) ;
2022-06-22 09:14:52 +08:00
//组装sql语句
StringBuilder stringBuilder = new StringBuilder ( ) ;
stringBuilder . append ( " time >= ' " ) . append ( DateUtil . beginOfDay ( DateUtil . parse ( " 2022-05-06 " ) ) ) . append ( " ' and " ) . append ( " time <= ' " ) . append ( DateUtil . endOfDay ( DateUtil . parse ( " 2022-05-06 " ) ) ) . append ( " ' and ( " ) ;
//sql语句
stringBuilder . append ( " line_id =' " ) . append ( " 1e3b8531483b2a8cbee6747f1f641cf9 " ) . append ( " ') " ) ;
//获取暂降事件
QueryResult result = influxDBUtil . query ( " select * from pqs_eventdetail where " + stringBuilder . toString ( ) ) ;
InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper ( ) ;
List < EventDetail > eventDetailList = influxDBResultMapper . toPOJO ( result , EventDetail . class ) ;
2022-06-21 20:47:46 +08:00
long endTime = System . currentTimeMillis ( ) ;
2022-06-22 09:14:52 +08:00
System . out . println ( eventDetailList ) ;
2022-06-21 20:47:46 +08:00
return result ;
}
//处理结果集
public static void chanelResult ( QueryResult result ) {
QueryResult . Result result1 = result . getResults ( ) . get ( 0 ) ;
if ( result1 . getSeries ( ) ! = null ) {
List < List < Object > > valueList = result1 . getSeries ( ) . stream ( ) . map ( QueryResult . Series : : getValues ) . collect ( Collectors . toList ( ) ) . get ( 0 ) ;
if ( valueList ! = null & & valueList . size ( ) > 0 ) {
for ( List < Object > value : valueList ) {
Map < String , String > map = new HashMap < String , String > ( ) ;
// 数据库中字段1取值
String field1 = value . get ( 0 ) = = null ? null : value . get ( 0 ) . toString ( ) ;
System . out . println ( field1 ) ;
// 数据库中字段2取值
String field2 = value . get ( 1 ) = = null ? null : value . get ( 1 ) . toString ( ) ;
System . out . println ( field2 ) ;
// TODO 用取出的字段做你自己的业务逻辑……
}
}
}
}
2022-06-22 09:55:31 +08:00
public static void main ( String [ ] args ) {
2022-06-21 20:47:46 +08:00
InfluxDbUtils influxDBUtil = new InfluxDbUtils ( " root " , " 123456789 " , " http://192.168.1.16:8086 " , " pqsbase " , " " ) ;
insert ( influxDBUtil ) ;
2022-06-22 09:14:52 +08:00
//select(influxDBUtil);
}
public static void deleteDB ( InfluxDbUtils influxDBUtil ) {
influxDBUtil . deleteDB ( " LIMIT_RATE " ) ;
2022-06-21 20:47:46 +08:00
}
//单条数据插入
public static void insert ( InfluxDbUtils influxDBUtil ) {
2022-06-22 09:14:52 +08:00
// long time = Long.parseLong("1655135328135");
// Map<String, String> tags = new HashMap<>();
// // tags.put("line_id", "127fad1dcb0077ac2979141b8473a5e4");
// tags.put("line_id", "1883a1fe6d9c3c5a03a4371bda024452");
// Map<String, Object> fields = new HashMap<>();
// fields.put("create_time", "2022-06-13 23:57:31");
// fields.put("update_time", "");
// fields.put("push_failed",0);
// fields.put("result",1);
// influxDBUtil.insert("pqs_event_push_logs", tags, fields, time, TimeUnit.MILLISECONDS);
long time = Long . parseLong ( " 1646180719000 " ) ;
2022-06-21 20:47:46 +08:00
Map < String , String > tags = new HashMap < > ( ) ;
2022-06-22 09:55:31 +08:00
tags . put ( " LineID " , " fd4ffb0dd33eafaaf403b07a3fc1afe5 " ) ;
Map < String , Object > fields = new HashMap < > ( ) ;
2022-06-22 09:14:52 +08:00
fields . put ( " VU_Dev1 " , 5 . 706 ) ;
fields . put ( " VU_Dev2 " , 5 . 706 ) ;
fields . put ( " VU_Dev3 " , 5 . 706 ) ;
fields . put ( " VU_Dev4 " , 5 . 706 ) ;
fields . put ( " VU_Dev5 " , 5 . 706 ) ;
fields . put ( " Freq_Dev1 " , 0 . 534 ) ;
fields . put ( " Freq_Dev2 " , 0 . 534 ) ;
fields . put ( " Freq_Dev3 " , 2 . 534 ) ;
fields . put ( " Freq_Dev4 " , 0 . 534 ) ;
fields . put ( " Freq_Dev5 " , 0 . 534 ) ;
fields . put ( " Data_PST1 " , 0 . 604 ) ;
fields . put ( " Data_PST2 " , 0 . 0 ) ;
fields . put ( " Data_PST3 " , 0 . 691 ) ;
fields . put ( " Data_PST4 " , 0 . 910 ) ;
fields . put ( " Data_PST5 " , 0 . 691 ) ;
fields . put ( " V_Unbalance1 " , 2 . 713 ) ;
fields . put ( " V_Unbalance2 " , 2 . 713 ) ;
fields . put ( " V_Unbalance3 " , 2 . 713 ) ;
fields . put ( " V_Unbalance4 " , 2 . 713 ) ;
fields . put ( " V_Unbalance5 " , 2 . 713 ) ;
fields . put ( " V_THD1 " , 20 . 001 ) ;
fields . put ( " V_THD2 " , 20 . 003 ) ;
fields . put ( " V_THD3 " , 20 . 00 ) ;
fields . put ( " V_THD4 " , 20 . 008 ) ;
fields . put ( " V_THD5 " , 20 . 00 ) ;
fields . put ( " Event1 " , 1 . 619 ) ;
fields . put ( " Event2 " , 1 . 619 ) ;
fields . put ( " Event3 " , 1 . 619 ) ;
fields . put ( " Event4 " , 1 . 619 ) ;
fields . put ( " Event5 " , 1 . 619 ) ;
influxDBUtil . insert ( " PQS_COMASSES " , tags , fields , time , TimeUnit . MILLISECONDS ) ;
// long time = Long.parseLong("1647473742000");
// tags.put("MYINDEX", "df1ff413949f6d1fc07ffdb5440b4907");
// Map<String, Object> fields = new HashMap<>();
// fields.put("Phasic_Type","T");
// fields.put("AllTime",1155);
// fields.put("Flicker_AllTime",550);
// fields.put("Flicker_OverTime",0);
// fields.put("Freq_Dev_OverTime",0);
// fields.put("Voltage_Dev_OverTime",0);
// fields.put("UBalance_OverTime",0);
// fields.put("UAberrance_OverTime",0);
// fields.put("I_Neg_OverTime",0);
// fields.put("UHarm_2_OverTime",0);
// fields.put("UHarm_3_OverTime",0);
// fields.put("UHarm_4_OverTime",0);
// fields.put("UHarm_5_OverTime",0);
// fields.put("UHarm_6_OverTime",0);
// fields.put("UHarm_7_OverTime",0);
// fields.put("UHarm_8_OverTime",0);
// fields.put("UHarm_9_OverTime",0);
// fields.put("UHarm_10_OverTime",0);
// fields.put("UHarm_11_OverTime",0);
// fields.put("UHarm_12_OverTime",0);
// fields.put("UHarm_13_OverTime",0);
// fields.put("UHarm_14_OverTime",0);
// fields.put("UHarm_15_OverTime",0);
// fields.put("UHarm_16_OverTime",0);
// fields.put("UHarm_17_OverTime",0);
// fields.put("UHarm_18_OverTime",0);
// fields.put("UHarm_19_OverTime",0);
// fields.put("UHarm_20_OverTime",0);
// fields.put("UHarm_21_OverTime",0);
// fields.put("UHarm_22_OverTime",0);
// fields.put("UHarm_23_OverTime",0);
// fields.put("UHarm_24_OverTime",0);
// fields.put("UHarm_25_OverTime",0);
// fields.put("IHarm_2_OverTime",0);
// fields.put("IHarm_3_OverTime",0);
// fields.put("IHarm_4_OverTime",0);
// fields.put("IHarm_5_OverTime",0);
// fields.put("IHarm_6_OverTime",0);
// fields.put("IHarm_7_OverTime",0);
// fields.put("IHarm_8_OverTime",0);
// fields.put("IHarm_9_OverTime",0);
// fields.put("IHarm_10_OverTime",0);
// fields.put("IHarm_11_OverTime",0);
// fields.put("IHarm_12_OverTime",0);
// fields.put("IHarm_13_OverTime",0);
// fields.put("IHarm_14_OverTime",0);
// fields.put("IHarm_15_OverTime",0);
// fields.put("IHarm_16_OverTime",0);
// fields.put("IHarm_17_OverTime",0);
// fields.put("IHarm_18_OverTime",0);
// fields.put("IHarm_19_OverTime",0);
// fields.put("IHarm_20_OverTime",0);
// fields.put("IHarm_21_OverTime",0);
// fields.put("IHarm_22_OverTime",0);
// fields.put("IHarm_23_OverTime",0);
// fields.put("IHarm_24_OverTime",0);
// fields.put("IHarm_25_OverTime",0);
// fields.put("InUHARM_1_OverTime",0);
// fields.put("InUHARM_2_OverTime",0);
// fields.put("InUHARM_3_OverTime",0);
// fields.put("InUHARM_4_OverTime",0);
// fields.put("InUHARM_5_OverTime",0);
// fields.put("InUHARM_6_OverTime",0);
// fields.put("InUHARM_7_OverTime",0);
// fields.put("InUHARM_8_OverTime",0);
// fields.put("InUHARM_9_OverTime",0);
// fields.put("InUHARM_10_OverTime",0);
// fields.put("InUHARM_11_OverTime",0);
// fields.put("InUHARM_12_OverTime",0);
// fields.put("InUHARM_13_OverTime",0);
// fields.put("InUHARM_14_OverTime",0);
// fields.put("InUHARM_15_OverTime",0);
// fields.put("InUHARM_16_OverTime",0);
// influxDBUtil.insert("LIMIT_RATE", tags, fields, time, TimeUnit.MILLISECONDS);
2022-06-21 20:47:46 +08:00
}
//循环写入数据库
public static void batchInsertOne ( InfluxDbUtils influxDBUtil ) {
Map < String , String > tags1 = new HashMap < > ( ) ;
2022-06-22 09:14:52 +08:00
tags1 . put ( " line_id " , " 127fad1dcb0077ac2979141b8473a5e4 " ) ;
2022-06-21 20:47:46 +08:00
Map < String , Object > fields1 = new HashMap < > ( ) ;
2022-06-22 09:14:52 +08:00
fields1 . put ( " describe " , " 暂降事件1 " ) ;
fields1 . put ( " wave_type " , 1 ) ;
fields1 . put ( " persist_time " , 1620 ) ;
fields1 . put ( " event_value " , 0 . 956 ) ;
2022-06-21 20:47:46 +08:00
Map < String , String > tags2 = new HashMap < > ( ) ;
tags2 . put ( " LineID " , " 9 " ) ;
tags2 . put ( " Phasic_Type " , " A " ) ;
Map < String , Object > fields2 = new HashMap < > ( ) ;
fields2 . put ( " RMS " , 4 ) ;
fields2 . put ( " RMS_AB " , 4 ) ;
fields2 . put ( " RMS_BC " , 4 ) ;
fields2 . put ( " RMS_CA " , 4 ) ;
// 一条记录值
Point point1 = influxDBUtil . pointBuilder ( " test " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags1 , fields1 ) ;
Point point2 = influxDBUtil . pointBuilder ( " test " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags2 , fields2 ) ;
// 将两条记录添加到batchPoints中
BatchPoints batchPoints1 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 8 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " )
. consistency ( ConsistencyLevel . ALL ) . build ( ) ;
BatchPoints batchPoints2 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 9 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " )
. consistency ( ConsistencyLevel . ALL ) . build ( ) ;
batchPoints1 . point ( point1 ) ;
batchPoints2 . point ( point2 ) ;
// 将两条数据批量插入到数据库中
influxDBUtil . batchInsert ( batchPoints1 , TimeUnit . MILLISECONDS ) ;
influxDBUtil . batchInsert ( batchPoints2 , TimeUnit . MILLISECONDS ) ;
}
//批量插入数据
public static void batchInsert ( InfluxDbUtils influxDBUtil ) {
Map < String , String > tags1 = new HashMap < > ( ) ;
tags1 . put ( " LineID " , " 4 " ) ;
tags1 . put ( " Phasic_Type " , " A " ) ;
Map < String , Object > fields1 = new HashMap < > ( ) ;
fields1 . put ( " RMS " , 4 . 1111 ) ;
fields1 . put ( " RMS_AB " , 4 . 1111 ) ;
fields1 . put ( " RMS_BC " , 4 . 1111 ) ;
fields1 . put ( " RMS_CA " , 4 . 1111 ) ;
Map < String , String > tags2 = new HashMap < > ( ) ;
tags2 . put ( " LineID " , " 5 " ) ;
tags2 . put ( " Phasic_Type " , " A " ) ;
Map < String , Object > fields2 = new HashMap < > ( ) ;
fields2 . put ( " RMS " , 5 . 1111 ) ;
fields2 . put ( " RMS_AB " , 5 . 1111 ) ;
fields2 . put ( " RMS_BC " , 5 . 1111 ) ;
fields2 . put ( " RMS_CA " , 5 . 1111 ) ;
// 一条记录值。( 注意: 生产环境不要用System.currentTimeMillis(),因为数据量大会产生重复时间戳,导致数据丢失,要用数据自己的时间戳,这里只做演示)
Point point1 = influxDBUtil . pointBuilder ( " Data_v " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags1 , fields1 ) ;
Point point2 = influxDBUtil . pointBuilder ( " Data_v " , System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS , tags2 , fields2 ) ;
// BatchPoints batchPoints1 = BatchPoints.database("Data_v").tag("LineID", "4").tag("Phasic_Type","A").retentionPolicy("").consistency(ConsistencyLevel.ALL).precision(TimeUnit.MILLISECONDS).build();
BatchPoints batchPoints1 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 4 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " ) . consistency ( ConsistencyLevel . ALL ) . build ( ) ;
batchPoints1 . point ( point1 ) ;
BatchPoints batchPoints2 = BatchPoints . database ( " test " ) . tag ( " LineID " , " 5 " ) . tag ( " Phasic_Type " , " A " ) . retentionPolicy ( " " ) . consistency ( ConsistencyLevel . ALL ) . build ( ) ;
// 将两条记录添加到batchPoints中
batchPoints2 . point ( point2 ) ;
// 将不同的batchPoints序列化后, 一次性写入数据库, 提高写入速度
List < String > records = new ArrayList < String > ( ) ;
records . add ( batchPoints1 . lineProtocol ( ) ) ;
records . add ( batchPoints2 . lineProtocol ( ) ) ;
// 将两条数据批量插入到数据库中
influxDBUtil . batchInsert ( " test " , " " , ConsistencyLevel . ALL , TimeUnit . MILLISECONDS , records ) ;
}
}