将influxDB的数据库名配置调整为配置文件

This commit is contained in:
2022-10-08 12:50:20 +08:00
parent ab8e86f257
commit 5ba00aca0a
18 changed files with 135 additions and 79 deletions

View File

@@ -1,6 +1,7 @@
package com.njcn.influxdb.config;
import com.njcn.influxdb.utils.InfluxDbUtils;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -12,6 +13,7 @@ import org.springframework.context.annotation.Configuration;
* @version 1.0.0
* @createTime 2021/12/10 10:48
*/
@Data
@Configuration
public class InfluxDbConfig {

View File

@@ -9,11 +9,6 @@ package com.njcn.influxdb.param;
*/
public interface InfluxDBPublicParam {
/**
* influxDB数据库名称
*/
String DATABASE = "PQSBASE";
/**
* 暂态事件汇总表
*/

View File

@@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateTime;
import com.njcn.event.pojo.po.EventDetail;
import com.njcn.event.pojo.vo.EventDetailCount;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
@@ -12,6 +14,7 @@ import org.influxdb.querybuilder.SelectionQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@@ -19,7 +22,7 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.njcn.influxdb.param.InfluxDBPublicParam.DATABASE;
import static com.njcn.influxdb.param.InfluxDBPublicParam.PQS_EVENT_DETAIL;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
@@ -32,6 +35,10 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
*/
@Component
public class PqsEventDetailQuery extends QueryBuilder {
@Resource
private InfluxDbConfig influxDbConfig;
protected PqsEventDetailQuery(InfluxDbUtils influxDbUtils) {
super(influxDbUtils);
}
@@ -58,7 +65,7 @@ public class PqsEventDetailQuery extends QueryBuilder {
* @see SelectQueryImpl
*/
private SelectQueryImpl fromTable(SelectionQueryImpl column) {
return column.from(DATABASE, PQS_EVENT_DETAIL);
return column.from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL);
}
/**

View File

@@ -1,6 +1,7 @@
package com.njcn.event.influxdb;
import com.njcn.event.pojo.po.PqsOnlinerate;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
@@ -8,15 +9,19 @@ import org.influxdb.querybuilder.SelectionQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import static com.njcn.influxdb.param.InfluxDBPublicParam.DATABASE;
import static com.njcn.influxdb.param.InfluxDBPublicParam.PQS_ONLINERATE;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
@Component
public class PqsOnlinerateQuery extends QueryBuilder {
@Resource
private InfluxDbConfig influxDbConfig;
protected PqsOnlinerateQuery(InfluxDbUtils influxDbUtils) {
super(influxDbUtils);
@@ -29,7 +34,7 @@ public class PqsOnlinerateQuery extends QueryBuilder {
* @see SelectQueryImpl
*/
private SelectQueryImpl fromTable(SelectionQueryImpl column) {
return column.from(DATABASE, PQS_ONLINERATE);
return column.from(influxDbConfig.getDatabase(), PQS_ONLINERATE);
}
/**

View File

@@ -3,6 +3,7 @@ package com.njcn.event;
import com.njcn.event.pojo.PqsEventDetail;
import com.njcn.event.pojo.PqsOnlinerateAggregate;
import com.njcn.event.pojo.PqsEventDetailCount;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
@@ -18,10 +19,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
import static com.njcn.influxdb.param.InfluxDBPublicParam.DATABASE;
import static com.njcn.influxdb.param.InfluxDBPublicParam.PQS_EVENT_DETAIL;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;
import static org.influxdb.querybuilder.FunctionFactory.sum;
@@ -30,8 +32,11 @@ import static org.influxdb.querybuilder.FunctionFactory.sum;
@RunWith(SpringRunner.class)
@SpringBootTest
public class EventBootApplicationTest {
@Resource
private InfluxDbConfig influxDbConfig;
@Autowired
private InfluxDbUtils influxDbUtils;
@@ -42,7 +47,7 @@ public class EventBootApplicationTest {
// or 条件数据
List<Clause> clauses = getClauses();
SelectQueryImpl selectQuery = select().column("line_id").column("eventass_index").from(DATABASE, PQS_EVENT_DETAIL);
SelectQueryImpl selectQuery = select().column("line_id").column("eventass_index").from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
// WHERE (line_id = '1' OR line_id = '2' OR line_id = '3') 加上前后()
@@ -63,7 +68,7 @@ public class EventBootApplicationTest {
// or 条件数据
List<Clause> clauses = getClauses();
SelectQueryImpl selectQuery = select().count("eventass_index").from(DATABASE, PQS_EVENT_DETAIL);
SelectQueryImpl selectQuery = select().count("eventass_index").from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
// WHERE (line_id = '1' OR line_id = '2' OR line_id = '3') 加上前后()
@@ -86,7 +91,7 @@ public class EventBootApplicationTest {
SelectionQueryImpl select = select();
SelectionQueryImpl sum = select.op(op(sum("onlinemin"), "/", op(sum("onlinemin"), "+", sum("offlinemin"))), "*", 100)
.as("value");
SelectQueryImpl selectQuery = sum.from(DATABASE, "pqs_onlinerate");
SelectQueryImpl selectQuery = sum.from(influxDbConfig.getDatabase(), "pqs_onlinerate");
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
// AND time >= '2022-05-01T00:00:00Z' AND time <= '2022-09-01T00:00:00Z' tz('Asia/Shanghai');

View File

@@ -6,6 +6,7 @@ import com.njcn.device.pq.pojo.dto.OnlineLineDTO;
import com.njcn.energy.pojo.constant.ModelState;
import com.njcn.executor.pojo.vo.PqsCommunicateClone;
import com.njcn.executor.pojo.vo.PqsOnlineRate;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
@@ -53,6 +54,8 @@ public class ClonePqsOnlineRateJob {
private final LineFeignClient lineFeignClient;
private final InfluxDbConfig influxDbConfig;
@XxlJob("clonePqsOnlineRateJobHandler")
public void clonePqsOnlineRateJobHandler() throws ParseException {
List<PqsOnlineRate> result = new ArrayList<>();
@@ -235,7 +238,7 @@ public class ClonePqsOnlineRateJob {
* @return pqs_communicate数据
*/
private List<PqsCommunicateClone> getPqsCommunicateClone(List<String> list, String startTime, String endTime){
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
@@ -270,7 +273,7 @@ public class ClonePqsOnlineRateJob {
* @return pqs_communicate数据
*/
private List<PqsCommunicateClone> getData(List<String> list,String endTime){
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
where.and(lte(TIME,endTime));
@@ -298,10 +301,10 @@ public class ClonePqsOnlineRateJob {
fields.put(OFFLINE_MIN,item.getOfflineMin());
fields.put(ONLINE_RATE,item.getOnlineRate());
Point point = influxDbUtils.pointBuilder(PQS_ONLINERATE, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -3,6 +3,7 @@ package com.njcn.executor.handler;
import com.njcn.common.pojo.constant.PatternRegex;
import com.njcn.device.pq.api.LineFeignClient;
import com.njcn.executor.pojo.vo.*;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.param.InfluxDBPublicParam;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
@@ -40,6 +41,8 @@ public class DayJob {
private final LineFeignClient lineFeignClient;
private final InfluxDbConfig influxDbConfig;
@XxlJob("dayJobHandler")
public void getDayData() throws ParseException {
List<String> paramList = new ArrayList<>();
@@ -333,11 +336,11 @@ public class DayJob {
fields.put("v_49",item.getV49());
fields.put("v_50",item.getV50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_V, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
@@ -500,11 +503,11 @@ public class DayJob {
fields.put("i_49",item.getI49());
fields.put("i_50",item.getI50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_I, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
@@ -578,11 +581,11 @@ public class DayJob {
fields.put("plt",item.getPlt());
fields.put("pst",item.getPst());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_FLICKER, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
@@ -655,11 +658,11 @@ public class DayJob {
fields.put("fluc",item.getFluc());
fields.put("fluccf",item.getFluccf());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_FLUC, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
@@ -811,11 +814,11 @@ public class DayJob {
fields.put("i_49",item.getI49());
fields.put("i_50",item.getI50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_PHASIC_I, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -966,11 +969,11 @@ public class DayJob {
fields.put("v_49",item.getV49());
fields.put("v_50",item.getV50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_PHASIC_V, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -1128,11 +1131,11 @@ public class DayJob {
fields.put("p_49",item.getP49());
fields.put("p_50",item.getP50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_POWER_P, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
@@ -1289,11 +1292,11 @@ public class DayJob {
fields.put("q_49",item.getQ49());
fields.put("q_50",item.getQ50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_POWER_Q, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -1449,11 +1452,11 @@ public class DayJob {
fields.put("s_49",item.getS49());
fields.put("s_50",item.getS50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_POWER_S, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -1604,11 +1607,11 @@ public class DayJob {
fields.put("i_49",item.getI49());
fields.put("i_50",item.getI50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_RATE_I, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -1759,11 +1762,11 @@ public class DayJob {
fields.put("v_49",item.getV49());
fields.put("v_50",item.getV50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_RATE_V, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -1914,11 +1917,11 @@ public class DayJob {
fields.put("i_49",item.getI49());
fields.put("i_50",item.getI50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_IN_HARM_I, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -2069,11 +2072,11 @@ public class DayJob {
fields.put("v_49",item.getV49());
fields.put("v_50",item.getV50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_IN_HARM_V, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
@@ -2225,11 +2228,11 @@ public class DayJob {
fields.put("i_49",item.getI49());
fields.put("i_50",item.getI50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_IN_HARM_I, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -2380,11 +2383,11 @@ public class DayJob {
fields.put("v_49",item.getV49());
fields.put("v_50",item.getV50());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DATA_IN_HARM_V, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
/**
@@ -2455,11 +2458,11 @@ public class DayJob {
tags.put("value_type",item.getValueType());
fields.put("plt",item.getPlt());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_PLT, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
public StringBuilder lineStringBuilder(List<String> list) {

View File

@@ -2,6 +2,7 @@ package com.njcn.executor.handler;
import com.njcn.energy.pojo.api.EleIntegrityFeignClient;
import com.njcn.executor.pojo.dto.HarmonicDTO;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
@@ -34,10 +35,10 @@ public class EleIntegrityJob {
private final Integer AIR_DATA_DUE = 288;
private final String DATABASE = "PQSBASE";
private final InfluxDbUtils influxDbUtils;
private final InfluxDbConfig influxDbConfig;
private final EleIntegrityFeignClient eleIntegrityFeignClient;
@XxlJob("eleIntegrityJobHandler")
@@ -156,10 +157,10 @@ public class EleIntegrityJob {
fields.put("real",item.getReal());
fields.put("due",item.getDue());
Point point = influxDbUtils.pointBuilder("ele_integrity", time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
};
}

View File

@@ -2,6 +2,7 @@ package com.njcn.executor.handler;
import com.njcn.energy.pojo.api.EleOnlineRateFeignClient;
import com.njcn.energy.pojo.dto.OnlineRateDTO;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
@@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit;
@AllArgsConstructor
public class EleOnlineRateJob {
private final String DATABASE = "PQSBASE";
private final InfluxDbConfig influxDbConfig;
private final InfluxDbUtils influxDbUtils;
@@ -59,11 +60,11 @@ public class EleOnlineRateJob {
tags.put("device_id",item.getDeviceId());
fields.put("online_rate",item.getOnlineRate());
Point point = influxDbUtils.pointBuilder("ele_online_rate", time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag("device_id", item.getDeviceId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("device_id", item.getDeviceId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
};

View File

@@ -2,6 +2,7 @@ package com.njcn.executor.handler;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
@@ -31,6 +32,8 @@ public class ElectricCalJob {
private final InfluxDbUtils influxDbUtils;
private final InfluxDbConfig influxDbConfig;
@XxlJob("ElectricCalJob")
public void ElectricCalJob() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -92,7 +95,7 @@ public class ElectricCalJob {
});
if (CollectionUtil.isNotEmpty(insertObj)) {
BatchPoints batchPoints = BatchPoints.database("PQSBASE").build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).build();
for (Map<String, Object> it : insertObj) {
Point.Builder point = Point.measurement("power_data_add");
it.forEach((key, val) -> {

View File

@@ -5,6 +5,7 @@ import com.njcn.device.pq.api.LineFeignClient;
import com.njcn.device.pq.pojo.dto.PollutionParamDTO;
import com.njcn.device.pq.pojo.po.Overlimit;
import com.njcn.executor.pojo.vo.*;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.param.InfluxDBPublicParam;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
@@ -42,6 +43,8 @@ public class LimitRateJob {
private final InfluxDbUtils influxDbUtils;
private final InfluxDbConfig influxDbConfig;
private final LineFeignClient lineFeignClient;
@XxlJob("limitRateJobHandler")
@@ -1356,10 +1359,10 @@ public class LimitRateJob {
fields.put("inuharm_15_overtime",item.getInuHarm15OverTime());
fields.put("inuharm_16_overtime",item.getInuHarm16OverTime());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.LIMIT_RATE, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -6,6 +6,7 @@ import com.njcn.device.pq.pojo.dto.PollutionParamDTO;
import com.njcn.device.pq.pojo.po.Overlimit;
import com.njcn.executor.pojo.vo.*;
import com.njcn.harmonic.pojo.po.LimitTarget;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.param.InfluxDBPublicParam;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
@@ -42,6 +43,8 @@ public class LimitTargetJob {
private final InfluxDbUtils influxDbUtils;
private final InfluxDbConfig influxDbConfig;
private final LineFeignClient lineFeignClient;
@XxlJob("limitTargetJobHandler")
@@ -973,10 +976,10 @@ public class LimitTargetJob {
fields.put("inuharm_15_overtime",item.getInuHarm15OverTime());
fields.put("inuharm_16_overtime",item.getInuHarm16OverTime());
Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.LIMIT_TARGET, time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -5,6 +5,7 @@ import com.njcn.device.pq.pojo.po.Overlimit;
import com.njcn.executor.pojo.dto.PollutionDTO;
import com.njcn.executor.pojo.vo.*;
import com.njcn.harmonic.pojo.dto.PublicDTO;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
@@ -38,7 +39,7 @@ import java.util.stream.Stream;
@AllArgsConstructor
public class PollutionJob {
private final String DATABASE = "PQSBASE";
private final InfluxDbConfig influxDbConfig;
private final InfluxDbUtils influxDbUtils;
@@ -554,10 +555,10 @@ public class PollutionJob {
fields.put("inuharm",item.getData7());
fields.put("flicker",item.getData8());
Point point = influxDbUtils.pointBuilder("harmonic_pollution", time, TimeUnit.MILLISECONDS,tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -4,6 +4,7 @@ import com.njcn.common.pojo.constant.PatternRegex;
import com.njcn.device.pq.api.LineFeignClient;
import com.njcn.device.pq.pojo.po.LineDetail;
import com.njcn.executor.pojo.vo.*;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.param.InfluxDBPublicParam;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
@@ -50,6 +51,8 @@ public class PqsIntegrityJob {
private final InfluxDbUtils influxDbUtils;
private final InfluxDbConfig influxDbConfig;
private final LineFeignClient lineFeignClient;
@XxlJob("pqsIntegrityJobHandler")
@@ -125,7 +128,7 @@ public class PqsIntegrityJob {
* @return dataV数据
*/
private List<DataV> getDataV(List<String> list, String startTime, String endTime){
SelectQueryImpl selectQuery = select().from(DATABASE, DATA_V);
SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), DATA_V);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
where.and(gte(TIME, startTime)).and(lte(TIME, endTime));
@@ -168,11 +171,11 @@ public class PqsIntegrityJob {
fields.put(DUE,item.getDue());
fields.put(REAL,item.getReal());
Point point = influxDbUtils.pointBuilder(PQS_INTEGRITY, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(LINE_ID, item.getLineId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(LINE_ID, item.getLineId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -5,6 +5,7 @@ import com.njcn.device.pq.api.LineFeignClient;
import com.njcn.energy.pojo.constant.ModelState;
import com.njcn.executor.pojo.vo.PqsCommunicate;
import com.njcn.executor.pojo.vo.PqsOnlineRate;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
@@ -50,6 +51,8 @@ public class PqsOnlineRateJob {
private final InfluxDbUtils influxDbUtils;
private final InfluxDbConfig influxDbConfig;
private final LineFeignClient lineFeignClient;
@XxlJob("pqsOnlineRateJobHandler")
@@ -210,7 +213,7 @@ public class PqsOnlineRateJob {
* @return pqs_communicate数据
*/
private List<PqsCommunicate> getPqsCommunicate(List<String> list, String startTime, String endTime){
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
where.and(gte(TIME, startTime)).and(lte(TIME, endTime));
@@ -244,7 +247,7 @@ public class PqsOnlineRateJob {
* @return pqs_communicate数据
*/
private List<PqsCommunicate> getData(List<String> list){
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
where.groupBy(DEV_ID).orderBy(desc()).limit(1);
@@ -270,10 +273,10 @@ public class PqsOnlineRateJob {
fields.put(OFFLINE_MIN,item.getOfflineMin());
fields.put(ONLINE_RATE,item.getOnlineRate());
Point point = influxDbUtils.pointBuilder(PQS_ONLINERATE, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -0,0 +1,16 @@
import com.njcn.executor.JobExecutorApplication;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.web.WebAppConfiguration;
/**
* @author hongawen
* @version 1.0.0
* @date 2021年12月10日 15:05
*/
@RunWith(SpringRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = JobExecutorApplication.class)
public class BaseJunitTest {
}

View File

@@ -1,4 +1,5 @@
import com.njcn.executor.pojo.vo.DataFlicker;
import com.njcn.influxdb.config.InfluxDbConfig;
import com.njcn.influxdb.utils.InfluxDbUtils;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
@@ -11,6 +12,7 @@ import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
@@ -24,14 +26,15 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
* @version 1.0.0
* @createTime 2022/7/4 19:04
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Test1.class)
public class Test1 {
public class Test1 extends BaseJunitTest{
@Resource
private InfluxDbConfig influxDbConfig;
@Test
public void testMethod(){
InfluxDbUtils influxDBUtil = new InfluxDbUtils("admin", "njcnpqs", "http://192.168.1.18:8086", "PQSBASE", "");
SelectQueryImpl selectQuery = select().from("PQSBASE","data_flicker").where(eq("fluc",0)).limit(1).tz("Asia/Shanghai");
InfluxDbUtils influxDBUtil = new InfluxDbUtils(influxDbConfig.getUserName(), influxDbConfig.getPassword(), influxDbConfig.getInfluxDBUrl(), influxDbConfig.getDatabase(), "");
SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(),"data_flicker").where(eq("fluc",0)).limit(1).tz("Asia/Shanghai");
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
QueryResult queryResult = influxDBUtil.query(selectQuery.getCommand());
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();

View File

@@ -37,7 +37,6 @@ public enum DicDataTypeEnum {
REPORT_TYPE("自定义报表类型","Report_Type"),
LINE_MARK("监测点评分等级","Line_Grade"),
LINE_TYPE("监测点类型","Line_Type")
;