diff --git a/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/config/InfluxDbConfig.java b/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/config/InfluxDbConfig.java index fa2d14703..9643fdaf3 100644 --- a/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/config/InfluxDbConfig.java +++ b/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/config/InfluxDbConfig.java @@ -1,6 +1,7 @@ package com.njcn.influxdb.config; import com.njcn.influxdb.utils.InfluxDbUtils; +import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -12,6 +13,7 @@ import org.springframework.context.annotation.Configuration; * @version 1.0.0 * @createTime 2021/12/10 10:48 */ +@Data @Configuration public class InfluxDbConfig { diff --git a/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java b/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java index 353e61ed4..1d1c7db60 100644 --- a/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java +++ b/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java @@ -9,11 +9,6 @@ package com.njcn.influxdb.param; */ public interface InfluxDBPublicParam { - /** - * influxDB数据库名称 - */ - String DATABASE = "PQSBASE"; - /** * 暂态事件汇总表 */ diff --git a/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsEventDetailQuery.java b/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsEventDetailQuery.java index 832108304..d3884b9f3 100644 --- a/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsEventDetailQuery.java +++ b/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsEventDetailQuery.java @@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateTime; import com.njcn.event.pojo.po.EventDetail; import com.njcn.event.pojo.vo.EventDetailCount; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; @@ -12,6 +14,7 @@ import org.influxdb.querybuilder.SelectionQueryImpl; import org.influxdb.querybuilder.clauses.Clause; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -19,7 +22,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; -import static com.njcn.influxdb.param.InfluxDBPublicParam.DATABASE; + import static com.njcn.influxdb.param.InfluxDBPublicParam.PQS_EVENT_DETAIL; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; @@ -32,6 +35,10 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; */ @Component public class PqsEventDetailQuery extends QueryBuilder { + + @Resource + private InfluxDbConfig influxDbConfig; + protected PqsEventDetailQuery(InfluxDbUtils influxDbUtils) { super(influxDbUtils); } @@ -58,7 +65,7 @@ public class PqsEventDetailQuery extends QueryBuilder { * @see SelectQueryImpl */ private SelectQueryImpl fromTable(SelectionQueryImpl column) { - return column.from(DATABASE, PQS_EVENT_DETAIL); + return column.from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL); } /** diff --git a/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsOnlinerateQuery.java b/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsOnlinerateQuery.java index cc68e6077..f16267b3e 100644 --- a/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsOnlinerateQuery.java +++ b/pqs-event/event-boot/src/main/java/com/njcn/event/influxdb/PqsOnlinerateQuery.java @@ -1,6 +1,7 @@ package com.njcn.event.influxdb; import com.njcn.event.pojo.po.PqsOnlinerate; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; @@ -8,15 +9,19 @@ import org.influxdb.querybuilder.SelectionQueryImpl; import org.influxdb.querybuilder.clauses.Clause; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; -import static com.njcn.influxdb.param.InfluxDBPublicParam.DATABASE; + import static com.njcn.influxdb.param.InfluxDBPublicParam.PQS_ONLINERATE; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; @Component public class PqsOnlinerateQuery extends QueryBuilder { + + @Resource + private InfluxDbConfig influxDbConfig; protected PqsOnlinerateQuery(InfluxDbUtils influxDbUtils) { super(influxDbUtils); @@ -29,7 +34,7 @@ public class PqsOnlinerateQuery extends QueryBuilder { * @see SelectQueryImpl */ private SelectQueryImpl fromTable(SelectionQueryImpl column) { - return column.from(DATABASE, PQS_ONLINERATE); + return column.from(influxDbConfig.getDatabase(), PQS_ONLINERATE); } /** diff --git a/pqs-event/event-boot/src/test/java/com/njcn/event/EventBootApplicationTest.java b/pqs-event/event-boot/src/test/java/com/njcn/event/EventBootApplicationTest.java index dfc8158dd..44ba22767 100644 --- a/pqs-event/event-boot/src/test/java/com/njcn/event/EventBootApplicationTest.java +++ b/pqs-event/event-boot/src/test/java/com/njcn/event/EventBootApplicationTest.java @@ -3,6 +3,7 @@ package com.njcn.event; import com.njcn.event.pojo.PqsEventDetail; import com.njcn.event.pojo.PqsOnlinerateAggregate; import com.njcn.event.pojo.PqsEventDetailCount; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import org.influxdb.dto.QueryResult; import org.influxdb.impl.InfluxDBResultMapper; @@ -18,10 +19,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import javax.annotation.Resource; import java.util.Arrays; import java.util.List; -import static com.njcn.influxdb.param.InfluxDBPublicParam.DATABASE; + import static com.njcn.influxdb.param.InfluxDBPublicParam.PQS_EVENT_DETAIL; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*; import static org.influxdb.querybuilder.FunctionFactory.sum; @@ -30,8 +32,11 @@ import static org.influxdb.querybuilder.FunctionFactory.sum; @RunWith(SpringRunner.class) @SpringBootTest public class EventBootApplicationTest { - - + + + @Resource + private InfluxDbConfig influxDbConfig; + @Autowired private InfluxDbUtils influxDbUtils; @@ -42,7 +47,7 @@ public class EventBootApplicationTest { // or 条件数据 List clauses = getClauses(); - SelectQueryImpl selectQuery = select().column("line_id").column("eventass_index").from(DATABASE, PQS_EVENT_DETAIL); + SelectQueryImpl selectQuery = select().column("line_id").column("eventass_index").from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL); WhereQueryImpl where = selectQuery.where(); // WHERE (line_id = '1' OR line_id = '2' OR line_id = '3') 加上前后() @@ -63,7 +68,7 @@ public class EventBootApplicationTest { // or 条件数据 List clauses = getClauses(); - SelectQueryImpl selectQuery = select().count("eventass_index").from(DATABASE, PQS_EVENT_DETAIL); + SelectQueryImpl selectQuery = select().count("eventass_index").from(influxDbConfig.getDatabase(), PQS_EVENT_DETAIL); WhereQueryImpl where = selectQuery.where(); // WHERE (line_id = '1' OR line_id = '2' OR line_id = '3') 加上前后() @@ -86,7 +91,7 @@ public class EventBootApplicationTest { SelectionQueryImpl select = select(); SelectionQueryImpl sum = select.op(op(sum("onlinemin"), "/", op(sum("onlinemin"), "+", sum("offlinemin"))), "*", 100) .as("value"); - SelectQueryImpl selectQuery = sum.from(DATABASE, "pqs_onlinerate"); + SelectQueryImpl selectQuery = sum.from(influxDbConfig.getDatabase(), "pqs_onlinerate"); WhereQueryImpl where = selectQuery.where(); // AND time >= '2022-05-01T00:00:00Z' AND time <= '2022-09-01T00:00:00Z' tz('Asia/Shanghai'); diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ClonePqsOnlineRateJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ClonePqsOnlineRateJob.java index c81a713ee..3d1b18c32 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ClonePqsOnlineRateJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ClonePqsOnlineRateJob.java @@ -6,6 +6,7 @@ import com.njcn.device.pq.pojo.dto.OnlineLineDTO; import com.njcn.energy.pojo.constant.ModelState; import com.njcn.executor.pojo.vo.PqsCommunicateClone; import com.njcn.executor.pojo.vo.PqsOnlineRate; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; @@ -53,6 +54,8 @@ public class ClonePqsOnlineRateJob { private final LineFeignClient lineFeignClient; + private final InfluxDbConfig influxDbConfig; + @XxlJob("clonePqsOnlineRateJobHandler") public void clonePqsOnlineRateJobHandler() throws ParseException { List result = new ArrayList<>(); @@ -235,7 +238,7 @@ public class ClonePqsOnlineRateJob { * @return pqs_communicate数据 */ private List getPqsCommunicateClone(List list, String startTime, String endTime){ - SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE); + SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE); WhereQueryImpl where = selectQuery.where(); whereAndNested(list, where); @@ -270,7 +273,7 @@ public class ClonePqsOnlineRateJob { * @return pqs_communicate数据 */ private List getData(List list,String endTime){ - SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE); + SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE); WhereQueryImpl where = selectQuery.where(); whereAndNested(list, where); where.and(lte(TIME,endTime)); @@ -298,10 +301,10 @@ public class ClonePqsOnlineRateJob { fields.put(OFFLINE_MIN,item.getOfflineMin()); fields.put(ONLINE_RATE,item.getOnlineRate()); Point point = influxDbUtils.pointBuilder(PQS_ONLINERATE, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } } diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/DayJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/DayJob.java index 4590eac89..06bd6bdfb 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/DayJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/DayJob.java @@ -3,6 +3,7 @@ package com.njcn.executor.handler; import com.njcn.common.pojo.constant.PatternRegex; import com.njcn.device.pq.api.LineFeignClient; import com.njcn.executor.pojo.vo.*; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.param.InfluxDBPublicParam; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.context.XxlJobHelper; @@ -40,6 +41,8 @@ public class DayJob { private final LineFeignClient lineFeignClient; + private final InfluxDbConfig influxDbConfig; + @XxlJob("dayJobHandler") public void getDayData() throws ParseException { List paramList = new ArrayList<>(); @@ -333,11 +336,11 @@ public class DayJob { fields.put("v_49",item.getV49()); fields.put("v_50",item.getV50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_V, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } @@ -500,11 +503,11 @@ public class DayJob { fields.put("i_49",item.getI49()); fields.put("i_50",item.getI50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_I, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } @@ -578,11 +581,11 @@ public class DayJob { fields.put("plt",item.getPlt()); fields.put("pst",item.getPst()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_FLICKER, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } @@ -655,11 +658,11 @@ public class DayJob { fields.put("fluc",item.getFluc()); fields.put("fluccf",item.getFluccf()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_FLUC, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } @@ -811,11 +814,11 @@ public class DayJob { fields.put("i_49",item.getI49()); fields.put("i_50",item.getI50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_PHASIC_I, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -966,11 +969,11 @@ public class DayJob { fields.put("v_49",item.getV49()); fields.put("v_50",item.getV50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_PHASIC_V, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -1128,11 +1131,11 @@ public class DayJob { fields.put("p_49",item.getP49()); fields.put("p_50",item.getP50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_POWER_P, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } @@ -1289,11 +1292,11 @@ public class DayJob { fields.put("q_49",item.getQ49()); fields.put("q_50",item.getQ50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_POWER_Q, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -1449,11 +1452,11 @@ public class DayJob { fields.put("s_49",item.getS49()); fields.put("s_50",item.getS50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_POWER_S, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -1604,11 +1607,11 @@ public class DayJob { fields.put("i_49",item.getI49()); fields.put("i_50",item.getI50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_RATE_I, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -1759,11 +1762,11 @@ public class DayJob { fields.put("v_49",item.getV49()); fields.put("v_50",item.getV50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_HARM_RATE_V, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -1914,11 +1917,11 @@ public class DayJob { fields.put("i_49",item.getI49()); fields.put("i_50",item.getI50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_IN_HARM_I, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -2069,11 +2072,11 @@ public class DayJob { fields.put("v_49",item.getV49()); fields.put("v_50",item.getV50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_IN_HARM_V, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } @@ -2225,11 +2228,11 @@ public class DayJob { fields.put("i_49",item.getI49()); fields.put("i_50",item.getI50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_IN_HARM_I, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -2380,11 +2383,11 @@ public class DayJob { fields.put("v_49",item.getV49()); fields.put("v_50",item.getV50()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DATA_IN_HARM_V, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } /** @@ -2455,11 +2458,11 @@ public class DayJob { tags.put("value_type",item.getValueType()); fields.put("plt",item.getPlt()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.DAY_PLT, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhaseType()).tag(InfluxDBPublicParam.QUALITY_FLAG,item.getQualityFlag()).tag(InfluxDBPublicParam.VALUE_TYPE,item.getValueType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } public StringBuilder lineStringBuilder(List list) { diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleIntegrityJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleIntegrityJob.java index 5b79a0e94..e1be8016d 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleIntegrityJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleIntegrityJob.java @@ -2,6 +2,7 @@ package com.njcn.executor.handler; import com.njcn.energy.pojo.api.EleIntegrityFeignClient; import com.njcn.executor.pojo.dto.HarmonicDTO; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.AllArgsConstructor; @@ -34,10 +35,10 @@ public class EleIntegrityJob { private final Integer AIR_DATA_DUE = 288; - private final String DATABASE = "PQSBASE"; - private final InfluxDbUtils influxDbUtils; + private final InfluxDbConfig influxDbConfig; + private final EleIntegrityFeignClient eleIntegrityFeignClient; @XxlJob("eleIntegrityJobHandler") @@ -156,10 +157,10 @@ public class EleIntegrityJob { fields.put("real",item.getReal()); fields.put("due",item.getDue()); Point point = influxDbUtils.pointBuilder("ele_integrity", time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(DATABASE).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); }; } diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleOnlineRateJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleOnlineRateJob.java index 546ed26e6..6d32d8153 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleOnlineRateJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/EleOnlineRateJob.java @@ -2,6 +2,7 @@ package com.njcn.executor.handler; import com.njcn.energy.pojo.api.EleOnlineRateFeignClient; import com.njcn.energy.pojo.dto.OnlineRateDTO; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.AllArgsConstructor; @@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit; @AllArgsConstructor public class EleOnlineRateJob { - private final String DATABASE = "PQSBASE"; + private final InfluxDbConfig influxDbConfig; private final InfluxDbUtils influxDbUtils; @@ -59,11 +60,11 @@ public class EleOnlineRateJob { tags.put("device_id",item.getDeviceId()); fields.put("online_rate",item.getOnlineRate()); Point point = influxDbUtils.pointBuilder("ele_online_rate", time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(DATABASE).tag("device_id", item.getDeviceId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("device_id", item.getDeviceId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); }; diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ElectricCalJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ElectricCalJob.java index 2397db30d..29c92ef15 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ElectricCalJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/ElectricCalJob.java @@ -2,6 +2,7 @@ package com.njcn.executor.handler; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.AllArgsConstructor; @@ -31,6 +32,8 @@ public class ElectricCalJob { private final InfluxDbUtils influxDbUtils; + private final InfluxDbConfig influxDbConfig; + @XxlJob("ElectricCalJob") public void ElectricCalJob() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -92,7 +95,7 @@ public class ElectricCalJob { }); if (CollectionUtil.isNotEmpty(insertObj)) { - BatchPoints batchPoints = BatchPoints.database("PQSBASE").build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).build(); for (Map it : insertObj) { Point.Builder point = Point.measurement("power_data_add"); it.forEach((key, val) -> { diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitRateJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitRateJob.java index b9bd74129..c5d06e180 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitRateJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitRateJob.java @@ -5,6 +5,7 @@ import com.njcn.device.pq.api.LineFeignClient; import com.njcn.device.pq.pojo.dto.PollutionParamDTO; import com.njcn.device.pq.pojo.po.Overlimit; import com.njcn.executor.pojo.vo.*; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.param.InfluxDBPublicParam; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.context.XxlJobHelper; @@ -42,6 +43,8 @@ public class LimitRateJob { private final InfluxDbUtils influxDbUtils; + private final InfluxDbConfig influxDbConfig; + private final LineFeignClient lineFeignClient; @XxlJob("limitRateJobHandler") @@ -1356,10 +1359,10 @@ public class LimitRateJob { fields.put("inuharm_15_overtime",item.getInuHarm15OverTime()); fields.put("inuharm_16_overtime",item.getInuHarm16OverTime()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.LIMIT_RATE, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } } diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitTargetJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitTargetJob.java index 3ac4b0317..a100057ae 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitTargetJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/LimitTargetJob.java @@ -6,6 +6,7 @@ import com.njcn.device.pq.pojo.dto.PollutionParamDTO; import com.njcn.device.pq.pojo.po.Overlimit; import com.njcn.executor.pojo.vo.*; import com.njcn.harmonic.pojo.po.LimitTarget; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.param.InfluxDBPublicParam; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.context.XxlJobHelper; @@ -42,6 +43,8 @@ public class LimitTargetJob { private final InfluxDbUtils influxDbUtils; + private final InfluxDbConfig influxDbConfig; + private final LineFeignClient lineFeignClient; @XxlJob("limitTargetJobHandler") @@ -973,10 +976,10 @@ public class LimitTargetJob { fields.put("inuharm_15_overtime",item.getInuHarm15OverTime()); fields.put("inuharm_16_overtime",item.getInuHarm16OverTime()); Point point = influxDbUtils.pointBuilder(InfluxDBPublicParam.LIMIT_TARGET, time, TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(InfluxDBPublicParam.DATABASE).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(InfluxDBPublicParam.LINE_ID, item.getLineId()).tag(InfluxDBPublicParam.PHASIC_TYPE,item.getPhasicType()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(InfluxDBPublicParam.DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } } diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PollutionJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PollutionJob.java index 8f60a420a..52a8b2417 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PollutionJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PollutionJob.java @@ -5,6 +5,7 @@ import com.njcn.device.pq.pojo.po.Overlimit; import com.njcn.executor.pojo.dto.PollutionDTO; import com.njcn.executor.pojo.vo.*; import com.njcn.harmonic.pojo.dto.PublicDTO; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.AllArgsConstructor; @@ -38,7 +39,7 @@ import java.util.stream.Stream; @AllArgsConstructor public class PollutionJob { - private final String DATABASE = "PQSBASE"; + private final InfluxDbConfig influxDbConfig; private final InfluxDbUtils influxDbUtils; @@ -554,10 +555,10 @@ public class PollutionJob { fields.put("inuharm",item.getData7()); fields.put("flicker",item.getData8()); Point point = influxDbUtils.pointBuilder("harmonic_pollution", time, TimeUnit.MILLISECONDS,tags, fields); - BatchPoints batchPoints = BatchPoints.database(DATABASE).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } } diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsIntegrityJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsIntegrityJob.java index 031bbb4c5..c827f5c51 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsIntegrityJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsIntegrityJob.java @@ -4,6 +4,7 @@ import com.njcn.common.pojo.constant.PatternRegex; import com.njcn.device.pq.api.LineFeignClient; import com.njcn.device.pq.pojo.po.LineDetail; import com.njcn.executor.pojo.vo.*; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.param.InfluxDBPublicParam; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.context.XxlJobHelper; @@ -50,6 +51,8 @@ public class PqsIntegrityJob { private final InfluxDbUtils influxDbUtils; + private final InfluxDbConfig influxDbConfig; + private final LineFeignClient lineFeignClient; @XxlJob("pqsIntegrityJobHandler") @@ -125,7 +128,7 @@ public class PqsIntegrityJob { * @return dataV数据 */ private List getDataV(List list, String startTime, String endTime){ - SelectQueryImpl selectQuery = select().from(DATABASE, DATA_V); + SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), DATA_V); WhereQueryImpl where = selectQuery.where(); whereAndNested(list, where); where.and(gte(TIME, startTime)).and(lte(TIME, endTime)); @@ -168,11 +171,11 @@ public class PqsIntegrityJob { fields.put(DUE,item.getDue()); fields.put(REAL,item.getReal()); Point point = influxDbUtils.pointBuilder(PQS_INTEGRITY, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(LINE_ID, item.getLineId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(LINE_ID, item.getLineId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } } diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java index 161ff5b99..2cacb3624 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java @@ -5,6 +5,7 @@ import com.njcn.device.pq.api.LineFeignClient; import com.njcn.energy.pojo.constant.ModelState; import com.njcn.executor.pojo.vo.PqsCommunicate; import com.njcn.executor.pojo.vo.PqsOnlineRate; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; @@ -50,6 +51,8 @@ public class PqsOnlineRateJob { private final InfluxDbUtils influxDbUtils; + private final InfluxDbConfig influxDbConfig; + private final LineFeignClient lineFeignClient; @XxlJob("pqsOnlineRateJobHandler") @@ -210,7 +213,7 @@ public class PqsOnlineRateJob { * @return pqs_communicate数据 */ private List getPqsCommunicate(List list, String startTime, String endTime){ - SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE); + SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE); WhereQueryImpl where = selectQuery.where(); whereAndNested(list, where); where.and(gte(TIME, startTime)).and(lte(TIME, endTime)); @@ -244,7 +247,7 @@ public class PqsOnlineRateJob { * @return pqs_communicate数据 */ private List getData(List list){ - SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE); + SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(), PQS_COMMUNICATE); WhereQueryImpl where = selectQuery.where(); whereAndNested(list, where); where.groupBy(DEV_ID).orderBy(desc()).limit(1); @@ -270,10 +273,10 @@ public class PqsOnlineRateJob { fields.put(OFFLINE_MIN,item.getOfflineMin()); fields.put(ONLINE_RATE,item.getOnlineRate()); Point point = influxDbUtils.pointBuilder(PQS_ONLINERATE, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); }); - influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records); } } diff --git a/pqs-job/job-executor/src/test/java/BaseJunitTest.java b/pqs-job/job-executor/src/test/java/BaseJunitTest.java new file mode 100644 index 000000000..bb37b28aa --- /dev/null +++ b/pqs-job/job-executor/src/test/java/BaseJunitTest.java @@ -0,0 +1,16 @@ +import com.njcn.executor.JobExecutorApplication; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.context.web.WebAppConfiguration; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2021年12月10日 15:05 + */ +@RunWith(SpringRunner.class) +@WebAppConfiguration +@SpringBootTest(classes = JobExecutorApplication.class) +public class BaseJunitTest { +} diff --git a/pqs-job/job-executor/src/test/java/Test1.java b/pqs-job/job-executor/src/test/java/Test1.java index 1476eadcc..4ad8689ad 100644 --- a/pqs-job/job-executor/src/test/java/Test1.java +++ b/pqs-job/job-executor/src/test/java/Test1.java @@ -1,4 +1,5 @@ import com.njcn.executor.pojo.vo.DataFlicker; +import com.njcn.influxdb.config.InfluxDbConfig; import com.njcn.influxdb.utils.InfluxDbUtils; import org.influxdb.dto.QueryResult; import org.influxdb.impl.InfluxDBResultMapper; @@ -11,6 +12,7 @@ import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import javax.annotation.Resource; import java.util.Arrays; import java.util.List; @@ -24,14 +26,15 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; * @version 1.0.0 * @createTime 2022/7/4 19:04 */ -@RunWith(SpringRunner.class) -@SpringBootTest(classes = Test1.class) -public class Test1 { +public class Test1 extends BaseJunitTest{ + + @Resource + private InfluxDbConfig influxDbConfig; @Test public void testMethod(){ - InfluxDbUtils influxDBUtil = new InfluxDbUtils("admin", "njcnpqs", "http://192.168.1.18:8086", "PQSBASE", ""); - SelectQueryImpl selectQuery = select().from("PQSBASE","data_flicker").where(eq("fluc",0)).limit(1).tz("Asia/Shanghai"); + InfluxDbUtils influxDBUtil = new InfluxDbUtils(influxDbConfig.getUserName(), influxDbConfig.getPassword(), influxDbConfig.getInfluxDBUrl(), influxDbConfig.getDatabase(), ""); + SelectQueryImpl selectQuery = select().from(influxDbConfig.getDatabase(),"data_flicker").where(eq("fluc",0)).limit(1).tz("Asia/Shanghai"); WhereQueryImpl where = selectQuery.where(); QueryResult queryResult = influxDBUtil.query(selectQuery.getCommand()); InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); diff --git a/pqs-system/system-api/src/main/java/com/njcn/system/enums/DicDataTypeEnum.java b/pqs-system/system-api/src/main/java/com/njcn/system/enums/DicDataTypeEnum.java index 710d2f5ab..27089d7c2 100644 --- a/pqs-system/system-api/src/main/java/com/njcn/system/enums/DicDataTypeEnum.java +++ b/pqs-system/system-api/src/main/java/com/njcn/system/enums/DicDataTypeEnum.java @@ -37,7 +37,6 @@ public enum DicDataTypeEnum { REPORT_TYPE("自定义报表类型","Report_Type"), LINE_MARK("监测点评分等级","Line_Grade"), LINE_TYPE("监测点类型","Line_Type") - ;