1.电能质量代码调整

2.微服务-监测点数据完整性调用中心任务
3.分布式光伏部分接口移植
This commit is contained in:
2022-07-11 20:36:37 +08:00
parent fc6dedfabd
commit 2662d3a139
28 changed files with 1333 additions and 40 deletions

View File

@@ -14,6 +14,10 @@ import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
@@ -190,13 +194,21 @@ public class PollutionJob {
return lineFeignClient.getAllLineOverLimit("harmonic-boot","").getData();
}
private void whereAndNested(List<Clause> clauses, WhereQueryImpl<SelectQueryImpl> whereQuery) {
WhereNested<WhereQueryImpl<SelectQueryImpl>> andNested = whereQuery.andNested();
for (Clause clause : clauses) {
andNested.or(clause);
}
andNested.close();
}
/**
* 谐波电压 -> 电压总谐波畸变率
* 各监测点最新的A、B、C三相数据
* 按照监测点分组,每个监测点取最大
*/
private Map<String, Optional<PublicDTO>> getDistortionData(){
String sql = "SELECT * FROM day_v where value_type = 'CP95' and (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') group by line_id order by time desc limit 3";
String sql = "SELECT * FROM day_v where value_type = 'CP95' and (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') group by line_id order by time desc limit 3 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -226,7 +238,7 @@ public class PollutionJob {
* 各监测点最新的A、B、C三相数据
*/
private Map<String, Optional<PublicDTO>> getContentData(){
String sql = "SELECT * FROM day_harmrate_v where value_type = 'CP95' and (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') group by line_id order by time desc limit 3";
String sql = "SELECT * FROM day_harmrate_v where value_type = 'CP95' and (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') group by line_id order by time desc limit 3 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -279,7 +291,7 @@ public class PollutionJob {
* 各监测点最新的A、B、C三相数据
*/
private Map<String, Optional<PublicDTO>> getIharm(){
String sql = "SELECT * FROM day_i where value_type = 'CP95' and (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') group by line_id order by time desc limit 3";
String sql = "SELECT * FROM day_i where value_type = 'CP95' and (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') group by line_id order by time desc limit 3 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -332,7 +344,7 @@ public class PollutionJob {
* 各监测点最新的T相数据
*/
private Map<String, Optional<PublicDTO>> getFreq(){
String sql = "SELECT line_id,abs(freq_dev) AS freq_dev FROM day_v where phasic_type = 'T' and (value_type = 'MIN' or value_type = 'MAX') group by line_id order by time desc limit 2";
String sql = "SELECT line_id,abs(freq_dev) AS freq_dev FROM day_v where phasic_type = 'T' and (value_type = 'MIN' or value_type = 'MAX') group by line_id order by time desc limit 2 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -362,7 +374,7 @@ public class PollutionJob {
* 各监测点最新的A、B、C三相数据
*/
private Map<String, Optional<PublicDTO>> getDev(){
String sql = "SELECT line_id,vu_dev,vl_dev,value_type FROM day_v where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and (value_type = 'MIN' or value_type = 'MAX') group by line_id order by time desc limit 6";
String sql = "SELECT line_id,vu_dev,vl_dev,value_type FROM day_v where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and (value_type = 'MIN' or value_type = 'MAX') group by line_id order by time desc limit 6 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -393,7 +405,7 @@ public class PollutionJob {
* 各监测点最新的T相数据
*/
private Map<String, Optional<PublicDTO>> getUbalance(){
String sql = "SELECT line_id,v_unbalance,value_type FROM day_v where phasic_type = 'T' and (value_type = 'CP95' or value_type = 'MAX') group by line_id order by time desc limit 2";
String sql = "SELECT line_id,v_unbalance,value_type FROM day_v where phasic_type = 'T' and (value_type = 'CP95' or value_type = 'MAX') group by line_id order by time desc limit 2 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -423,7 +435,7 @@ public class PollutionJob {
* 各监测点最新的T相数据
*/
private Map<String, Optional<PublicDTO>> getIneg(){
String sql = "SELECT line_id,i_neg,value_type FROM day_i where phasic_type = 'T' and (value_type = 'CP95' or value_type = 'MAX') group by line_id order by time desc limit 2";
String sql = "SELECT line_id,i_neg,value_type FROM day_i where phasic_type = 'T' and (value_type = 'CP95' or value_type = 'MAX') group by line_id order by time desc limit 2 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -453,7 +465,7 @@ public class PollutionJob {
* 各监测点最新的A、B、C三相数据
*/
private Map<String, Optional<PublicDTO>> getInuharm(){
String sql = "SELECT * FROM day_inharm_v where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and value_type = 'CP95' group by line_id order by time desc limit 3";
String sql = "SELECT * FROM day_inharm_v where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and value_type = 'CP95' group by line_id order by time desc limit 3 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;
@@ -498,7 +510,7 @@ public class PollutionJob {
* 各监测点最新的A、B、C三相数据
*/
private Map<String, Optional<PublicDTO>> getFlicker(){
String sql = "SELECT * FROM day_plt where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and value_type = 'CP95' group by line_id order by time desc limit 3";
String sql = "SELECT * FROM day_plt where (phasic_type = 'A' or phasic_type = 'B' or phasic_type = 'C') and value_type = 'CP95' group by line_id order by time desc limit 3 tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Double> data;

View File

@@ -0,0 +1,178 @@
package com.njcn.executor.handler;
import com.njcn.common.pojo.constant.PatternRegex;
import com.njcn.device.api.LineFeignClient;
import com.njcn.device.pojo.po.LineDetail;
import com.njcn.executor.pojo.vo.*;
import com.njcn.influxdb.param.InfluxDBPublicParam;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static com.njcn.influxdb.param.InfluxDBPublicParam.*;
import static com.njcn.influxdb.param.InfluxDBPublicParam.LINE_ID;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/7/8 13:44
*/
@Slf4j
@Component
@AllArgsConstructor
public class PqsIntegrityJob {
private final InfluxDbUtils influxDbUtils;
private final LineFeignClient lineFeignClient;
@XxlJob("pqsIntegrityJobHandler")
public void pqsIntegrityJobHandler() throws ParseException {
List<PqsIntegrity> result = new ArrayList<>();
List<String> paramList = new ArrayList<>(),lineList = new ArrayList<>();
List<DataV> dataList = new ArrayList<>();
String command = XxlJobHelper.getJobParam();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar calendar = Calendar.getInstance();
calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH)-1, 0, 0, 0);
calendar.set(Calendar.MILLISECOND, 0);
Calendar calendar2 = Calendar.getInstance();
calendar2.set(calendar2.get(Calendar.YEAR), calendar2.get(Calendar.MONTH), calendar2.get(Calendar.DAY_OF_MONTH)-1, 23, 59, 59);
calendar2.set(Calendar.MILLISECOND, 0);
String startTime = format.format(calendar.getTime());
String endTime = format.format(calendar2.getTime());
if (!StringUtils.isEmpty(command)){
paramList = Arrays.asList(command.split(","));
startTime = paramList.get(0);
endTime = paramList.get(1);
lineList = paramList.subList(2,paramList.size());
boolean s1 = Pattern.matches(PatternRegex.TIME_FORMAT,startTime);
boolean e1 = Pattern.matches(PatternRegex.TIME_FORMAT,endTime);
if (!s1 || !e1){
log.error("补招时间格式错误");
return;
} else {
startTime = startTime + " 00:00:00";
endTime = endTime + " 23:59:59";
}
}
List<LineDetail> lineDetail = lineFeignClient.getLineDetail(lineList).getData();
if (!CollectionUtils.isEmpty(lineDetail)){
//获取dataV表中监测点的数据数量
lineList = lineDetail.stream().map(LineDetail::getId).collect(Collectors.toList());
long diff,diffDays,a,b = 0;
Date d1 = format.parse(startTime);
Date d2 = format.parse(endTime);
diff = d2.getTime() - d1.getTime();
diffDays = diff / (24 * 60 * 60 * 1000-1000);
int days = (int) diffDays;
for (int i = 1; i <= days; i++) {
a = d1.getTime() + (long)(i-1)*(24 * 60 * 60) * 1000;
b = d1.getTime() + (long)i*(24 * 60 * 60) * 1000-1000;
startTime = format.format(a);
endTime = format.format(b);
dataList = getDataV(lineList,startTime,endTime);
for (LineDetail detail : lineDetail) {
PqsIntegrity pqsIntegrity = new PqsIntegrity();
pqsIntegrity.setTime(Instant.ofEpochMilli(a));
pqsIntegrity.setLineId(detail.getId());
pqsIntegrity.setDue(DAY_MINUTE/detail.getTimeInterval());
if (!CollectionUtils.isEmpty(dataList)){
Map<String,List<DataV>> lineMap = dataList.stream().collect(Collectors.groupingBy(DataV::getLineId));
List<DataV> l1 = lineMap.get(detail.getId());
if (!CollectionUtils.isEmpty(l1)){
Map<Instant,List<DataV>> timeMap = l1.stream().collect(Collectors.groupingBy(DataV::getTime));
pqsIntegrity.setReal(timeMap.size());
}
}
result.add(pqsIntegrity);
}
}
}
insertData(result);
}
/**
* 获取dataV数据
* @param list 监测点集合
* @return dataV数据
*/
private List<DataV> getDataV(List<String> list, String startTime, String endTime){
SelectQueryImpl selectQuery = select().from(DATABASE, DATA_V);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
where.and(gte(TIME, startTime)).and(lte(TIME, endTime));
where.tz(TZ);
QueryResult queryResult = influxDbUtils.query(selectQuery.getCommand());
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
return resultMapper.toPOJO(queryResult, DataV.class);
}
/**
* 拼接监测点条件
* @param list 监测点集合
* @param whereQuery WhereQueryImpl
*/
private void whereAndNested(List<String> list, WhereQueryImpl<SelectQueryImpl> whereQuery) {
List<Clause> clauses = new ArrayList<>();
list.forEach(item->{
Clause clause = eq(LINE_ID, item);
clauses.add(clause);
});
WhereNested<WhereQueryImpl<SelectQueryImpl>> andNested = whereQuery.andNested();
for (Clause clause : clauses) {
andNested.or(clause);
}
andNested.close();
}
/**
* 功能描述:插入pqs_integrity表数据
* @author xy
* @param list 数据集合
* @date 2022/5/12 8:55
*/
private void insertData(List<PqsIntegrity> list){
List<String> records = new ArrayList<>();
list.forEach(item->{
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
tags.put(LINE_ID,item.getLineId());
fields.put(DUE,item.getDue());
fields.put(REAL,item.getReal());
Point point = influxDbUtils.pointBuilder(PQS_INTEGRITY, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(LINE_ID, item.getLineId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -0,0 +1,29 @@
package com.njcn.executor.handler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/7/8 13:43
*/
@Slf4j
@Component
@AllArgsConstructor
public class PqsOnlineRateJob {
}

View File

@@ -0,0 +1,31 @@
package com.njcn.executor.pojo.vo;
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import java.time.Instant;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/5/11 16:08
*/
@Data
@Measurement(name = "pqs_integrity")
public class PqsIntegrity {
@Column(name = "time")
private Instant time;
@Column(name = "line_id")
private String lineId;
@Column(name = "due")
private Integer due;
@Column(name = "real")
private Integer real = 0;
}

View File

@@ -41,12 +41,12 @@ logging:
xxl:
job:
admin:
addresses: http://@server.url@:10217/job-admin
addresses: http://192.168.1.13:10217/job-admin
#执行器通讯TOKEN [选填]:非空时启用;
accessToken:
executor:
#执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: executorCdf
appname: executor
#执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
#执行器IP [选填]默认为空表示自动获取IP多网卡时可手动设置指定IP该IP不会绑定Host仅作为通讯实用地址信息用于 "执行器注册" 和 "调度中心请求并触发任务"

View File

@@ -0,0 +1,56 @@
import com.njcn.executor.pojo.vo.DataFlicker;
import com.njcn.influxdb.utils.InfluxDbUtils;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.List;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/7/4 19:04
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Test1.class)
public class Test1 {
@Test
public void testMethod(){
InfluxDbUtils influxDBUtil = new InfluxDbUtils("admin", "njcnpqs", "http://192.168.1.18:8086", "pqsbase", "");
SelectQueryImpl selectQuery = select().from("pqsbase","data_flicker").where(eq("fluc",0)).limit(1).tz("Asia/Shanghai");
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
QueryResult queryResult = influxDBUtil.query(selectQuery.getCommand());
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<DataFlicker> list = resultMapper.toPOJO(queryResult, DataFlicker.class);
System.out.println("list===:" + list);
}
private List<Clause> getClauses() {
Clause c1 = eq("line_id", "5e467a40023b299070682eb21f2ec9a1");
Clause c2 = eq("line_id", "183245996f303ebfd80eeb3377cecdc2");
Clause c3 = eq("line_id", "0d46f54420246e999d5c68b3133f668c");
return Arrays.asList(c1, c2, c3);
}
private void whereAndNested(List<Clause> clauses, WhereQueryImpl<SelectQueryImpl> whereQuery) {
WhereNested<WhereQueryImpl<SelectQueryImpl>> andNested = whereQuery.andNested();
for (Clause clause : clauses) {
andNested.or(clause);
}
andNested.close();
}
}