package com.njcn.event; import com.njcn.event.enums.EventResponseEnum; import com.njcn.event.pojo.PqsEventDetail; import com.njcn.event.pojo.PqsOnlinerateAggregate; import com.njcn.event.pojo.PqsEventDetailCount; import com.njcn.event.pojo.dto.wave.EigenvalueDTO; import com.njcn.event.pojo.dto.wave.WaveDataDTO; import com.njcn.event.utils.WaveUtil; import com.njcn.huawei.obs.util.OBSUtil; import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import com.njcn.oss.constant.OssPath; import org.influxdb.dto.QueryResult; import org.influxdb.impl.InfluxDBResultMapper; import org.influxdb.querybuilder.SelectQueryImpl; import org.influxdb.querybuilder.SelectionQueryImpl; import org.influxdb.querybuilder.WhereNested; import org.influxdb.querybuilder.WhereQueryImpl; import org.influxdb.querybuilder.clauses.Clause; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; import java.io.FileNotFoundException; import java.io.InputStream; import java.util.Arrays; import java.util.List; import java.util.Objects; import static com.njcn.influxdb.param.InfluxDBPublicParam.PQS_EVENT_DETAIL; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*; import static org.influxdb.querybuilder.FunctionFactory.sum; @RunWith(SpringRunner.class) @SpringBootTest public class EventBootApplicationTest { @Resource private InfluxDbConfig influxDbConfig; @Autowired private InfluxDbUtils influxDbUtils; @Autowired private WaveUtil waveUtil; @Autowired private OBSUtil obsUtil; // TODO https://github.com/influxdata/influxdb-java/blob/master/QUERY_BUILDER.md @Test public void queryList() { // or 条件数据 List clauses = getClauses(); SelectQueryImpl selectQuery = select().column("line_id").column("eventass_index").from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL); WhereQueryImpl where = selectQuery.where(); // WHERE (line_id = '1' OR line_id = '2' OR line_id = '3') 加上前后() whereAndNested(clauses, where); // AND time >= '2022-05-01T00:00:00Z' AND time <= '2022-09-01T00:00:00Z' tz('Asia/Shanghai'); where.and(gte("time", "2022-05-01T00:00:00Z")).and(lte("time", "2022-09-01T00:00:00Z")); where.tz("Asia/Shanghai"); QueryResult result = influxDbUtils.query(selectQuery.getCommand()); InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper(); List re = influxDBResultMapper.toPOJO(result, PqsEventDetail.class); Assert.assertTrue(re.size() > 0); } @Test public void queryCount() { // or 条件数据 List clauses = getClauses(); SelectQueryImpl selectQuery = select().count("eventass_index").from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL); WhereQueryImpl where = selectQuery.where(); // WHERE (line_id = '1' OR line_id = '2' OR line_id = '3') 加上前后() whereAndNested(clauses, where); // AND time >= '2022-05-01T00:00:00Z' AND time <= '2022-09-01T00:00:00Z' tz('Asia/Shanghai'); where.and(gte("time", "2022-05-01T00:00:00Z")).and(lte("time", "2022-09-01T00:00:00Z")); where.tz("Asia/Shanghai"); QueryResult result = influxDbUtils.query(selectQuery.getCommand()); InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper(); List re = influxDBResultMapper.toPOJO(result, PqsEventDetailCount.class); Assert.assertTrue(re.size() > 0); } @Test public void queryAggregate() { // SELECT (SUM(onlinemin) / (SUM(onlinemin) + SUM(offlinemin))) * 100 FROM pqs_onlinerate SelectionQueryImpl select = select(); SelectionQueryImpl sum = select.op(op(sum("onlinemin"), "/", op(sum("onlinemin"), "+", sum("offlinemin"))), "*", 100) .as("value"); SelectQueryImpl selectQuery = sum.from(influxDbConfig.getDatabase(), "pqs_onlinerate"); WhereQueryImpl where = selectQuery.where(); // AND time >= '2022-05-01T00:00:00Z' AND time <= '2022-09-01T00:00:00Z' tz('Asia/Shanghai'); where.and(gte("time", "2022-05-01T00:00:00Z")).and(lte("time", "2022-09-01T00:00:00Z")); where.tz("Asia/Shanghai"); QueryResult result = influxDbUtils.query(selectQuery.getCommand()); InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper(); List re = influxDBResultMapper.toPOJO(result, PqsOnlinerateAggregate.class); Assert.assertTrue(re.size() > 0); } private List getClauses() { Clause c1 = eq("line_id", "5e467a40023b299070682eb21f2ec9a1"); Clause c2 = eq("line_id", "183245996f303ebfd80eeb3377cecdc2"); Clause c3 = eq("line_id", "0d46f54420246e999d5c68b3133f668c"); return Arrays.asList(c1, c2, c3); } private void whereAndNested(List clauses, WhereQueryImpl whereQuery) { WhereNested> andNested = whereQuery.andNested(); for (Clause clause : clauses) { andNested.or(clause); } andNested.close(); } @Test public void testHuaweiOBS() throws FileNotFoundException { String cfgPath = OssPath.WAVE_DIR+"192.168.1.190/PQMonitor_PQM1_002438_20210508_092859_938.CFG"; String datPath = OssPath.WAVE_DIR+"192.168.1.190/PQMonitor_PQM1_002438_20210508_092859_938.DAT"; InputStream cfgStream =obsUtil.downloadStream(cfgPath); InputStream datStream =obsUtil.downloadStream(datPath); if(Objects.isNull(cfgStream) || Objects.isNull(datStream)){ throw new FileNotFoundException(EventResponseEnum.ANALYSEWAVE_NOT_FOUND.getMessage()); } // 获取瞬时波形 //获取原始波形值 WaveDataDTO waveDataDTO = waveUtil.getComtrade(cfgStream,datStream, 1); // 获取RMS波形 WaveDataDTO waveDataDTO1 = waveUtil.getValidData(waveDataDTO); // 获取特征值 List lstEigenvalueDTO = waveUtil.getEigenvalue(waveDataDTO, true); System.out.println(1); } }