1.infludb公共参数名调整

2.微服务调度中心任务-终端在线率
This commit is contained in:
2022-07-13 10:39:13 +08:00
parent 63ce5e649c
commit 366bd2d44e
14 changed files with 394 additions and 33 deletions

View File

@@ -1,8 +1,40 @@
package com.njcn.executor.handler;
import com.njcn.common.pojo.constant.PatternRegex;
import com.njcn.device.api.LineFeignClient;
import com.njcn.energy.pojo.constant.ModelState;
import com.njcn.executor.pojo.vo.PqsCommunicate;
import com.njcn.executor.pojo.vo.PqsOnlineRate;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static com.njcn.influxdb.param.InfluxDBPublicParam.*;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;
/**
* 类的介绍:
@@ -16,14 +48,234 @@ import org.springframework.stereotype.Component;
@AllArgsConstructor
public class PqsOnlineRateJob {
private final InfluxDbUtils influxDbUtils;
private final LineFeignClient lineFeignClient;
private final Integer DAY_MINUTE= 60*24;
@XxlJob("pqsOnlineRateJobHandler")
public void pqsOnlineRateJobHandler() throws ParseException {
List<PqsOnlineRate> result = new ArrayList<>();
List<String> paramList = new ArrayList<>(),deviceList = new ArrayList<>();
String command = XxlJobHelper.getJobParam();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar calendar = Calendar.getInstance();
calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH)-1, 0, 0, 0);
calendar.set(Calendar.MILLISECOND, 0);
Calendar calendar2 = Calendar.getInstance();
calendar2.set(calendar2.get(Calendar.YEAR), calendar2.get(Calendar.MONTH), calendar2.get(Calendar.DAY_OF_MONTH)-1, 23, 59, 59);
calendar2.set(Calendar.MILLISECOND, 0);
String startTime = format.format(calendar.getTime());
String endTime = format.format(calendar2.getTime());
if (!StringUtils.isEmpty(command)){
paramList = Arrays.asList(command.split(","));
startTime = paramList.get(0);
endTime = paramList.get(1);
deviceList = paramList.subList(2,paramList.size());
boolean s1 = Pattern.matches(PatternRegex.TIME_FORMAT,startTime);
boolean e1 = Pattern.matches(PatternRegex.TIME_FORMAT,endTime);
if (!s1 || !e1){
log.error("补招时间格式错误");
return;
} else {
startTime = startTime + " 00:00:00";
endTime = endTime + " 23:59:59";
}
}
if (CollectionUtils.isEmpty(deviceList)){
deviceList = lineFeignClient.getDeviceList().getData();
}
if (!CollectionUtils.isEmpty(deviceList)){
long diff,diffDays,a,b = 0;
List<PqsCommunicate> l1 = new ArrayList<>(),l2 = new ArrayList<>();
Date d1 = format.parse(startTime);
Date d2 = format.parse(endTime);
diff = d2.getTime() - d1.getTime();
diffDays = diff / (24 * 60 * 60 * 1000-1000);
int days = (int) diffDays;
for (int i = 1; i <= days; i++) {
a = d1.getTime() + (long)(i-1)*(24 * 60 * 60) * 1000;
b = d1.getTime() + (long)i*(24 * 60 * 60) * 1000-1000;
startTime = format.format(a);
endTime = format.format(b);
//获取装置的最新的一条数据
List<PqsCommunicate> latestList = getData(deviceList);
if (!CollectionUtils.isEmpty(latestList)){
for (PqsCommunicate item : latestList) {
if (item.getTime().toEpochMilli() < a){
l1.add(item);
} else if (a <= item.getTime().toEpochMilli() && item.getTime().toEpochMilli() < b){
l2.add(item);
}
}
}
if (!CollectionUtils.isEmpty(l1)){
for (PqsCommunicate item : l1) {
PqsOnlineRate onlineRate = new PqsOnlineRate();
if (Objects.equals(item.getType(), ModelState.offline)){
onlineRate.setOfflineMin(DAY_MINUTE);
onlineRate.setOnlineMin(0);
onlineRate.setOnlineRate(0.0);
} else {
onlineRate.setOfflineMin(0);
onlineRate.setOnlineMin(DAY_MINUTE);
onlineRate.setOnlineRate(100.0);
}
onlineRate.setTime(Instant.ofEpochMilli(a));
onlineRate.setDevId(item.getDevId());
result.add(onlineRate);
}
}
if (!CollectionUtils.isEmpty(l2)){
List<String> devList = l2.stream().map(PqsCommunicate::getDevId).collect(Collectors.toList());
List<PqsCommunicate> list = getPqsCommunicate(devList,startTime,endTime);
//根据装置的id进行分组
Map<String,List<PqsCommunicate>> groupMap = list.stream().collect(Collectors.groupingBy(PqsCommunicate::getDevId));
try {
if (!CollectionUtils.isEmpty(groupMap)){
for (String key : groupMap.keySet()) {
int offTime = 0;
int onTime = 0;
PqsOnlineRate onlineRate = new PqsOnlineRate();
List<PqsCommunicate> infoList = groupMap.get(key);
if (infoList.size() > 1){
//获取最早一条记录
PqsCommunicate first = infoList.stream().min(Comparator.comparing(PqsCommunicate::getTime)).get();
//将上线和下线分组
Map<Integer,List<PqsCommunicate>> typeMap = infoList.stream().collect(Collectors.groupingBy(PqsCommunicate::getType));
List<PqsCommunicate> off = typeMap.get(0);
List<PqsCommunicate> on = typeMap.get(1);
if (first.getType() == 0){
if (off.size() == on.size()){
for (int j = 0; j < off.size(); j++) {
offTime = offTime + (int) (on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
}
} else {
for (int j = 0; j < off.size(); j++) {
if (j == off.size() - 1){
offTime = offTime + (int) (format.parse(endTime).getTime() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
} else {
offTime = offTime + (int) (on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
}
}
}
onTime = DAY_MINUTE-offTime;
} else {
if (off.size() == on.size()){
for (int j = 0; j < on.size(); j++) {
onTime = onTime + (int) (off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
}
} else {
for (int j = 0; j < on.size(); j++) {
if (j == on.size() - 1){
onTime = onTime + (int) (format.parse(endTime).getTime() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
} else {
onTime = onTime + (int) (off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
}
}
}
offTime = DAY_MINUTE-onTime;
}
} else {
LocalDateTime updateTime = LocalDateTime.ofInstant(infoList.get(0).getTime(), ZoneId.systemDefault());
if (Objects.equals(infoList.get(0).getType(),0)) {
onTime = 0;
offTime = (int) (format.parse(endTime).getTime() + 1000L - updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
} else {
offTime = 0;
onTime = (int) (format.parse(endTime).getTime() + 1000L - updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
}
}
onlineRate.setDevId(infoList.get(0).getDevId());
onlineRate.setOnlineMin(onTime);
onlineRate.setOfflineMin(offTime);
onlineRate.setOnlineRate(Double.parseDouble(String.format("%.2f",onTime*1.0/DAY_MINUTE*100)));
onlineRate.setTime(Instant.ofEpochMilli(a));
result.add(onlineRate);
}
}
} catch (ParseException e) {
e.getMessage();
}
}
}
}
insertData(result);
}
/**
* 获取pqs_communicate数据
* @param list 装置集合
* @param startTime 开始时间
* @param endTime 结束时间
* @return pqs_communicate数据
*/
private List<PqsCommunicate> getPqsCommunicate(List<String> list, String startTime, String endTime){
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
where.and(gte(TIME, startTime)).and(lte(TIME, endTime));
where.tz(TZ);
QueryResult queryResult = influxDbUtils.query(selectQuery.getCommand());
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
return resultMapper.toPOJO(queryResult, PqsCommunicate.class);
}
/**
* 拼接装置条件
* @param list 装置集合
* @param whereQuery WhereQueryImpl
*/
private void whereAndNested(List<String> list, WhereQueryImpl<SelectQueryImpl> whereQuery) {
List<Clause> clauses = new ArrayList<>();
list.forEach(item->{
Clause clause = eq(DEV_ID, item);
clauses.add(clause);
});
WhereNested<WhereQueryImpl<SelectQueryImpl>> andNested = whereQuery.andNested();
for (Clause clause : clauses) {
andNested.or(clause);
}
andNested.close();
}
/**
* 获取pqs_communicate数据最新一条数据
* @param list 装置id集合
* @return pqs_communicate数据
*/
private List<PqsCommunicate> getData(List<String> list){
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
whereAndNested(list, where);
where.groupBy(DEV_ID).orderBy(desc()).limit(1);
where.tz(TZ);
QueryResult queryResult = influxDbUtils.query(selectQuery.getCommand());
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
return resultMapper.toPOJO(queryResult, PqsCommunicate.class);
}
/**
* 功能描述:插入pqs_integrity表数据
* @author xy
* @param list 数据集合
* @date 2022/5/12 8:55
*/
private void insertData(List<PqsOnlineRate> list){
List<String> records = new ArrayList<>();
list.forEach(item->{
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
tags.put(DEV_ID,item.getDevId());
fields.put(ONLINE_MIN,item.getOnlineMin());
fields.put(OFFLINE_MIN,item.getOfflineMin());
fields.put(ONLINE_RATE,item.getOnlineRate());
Point point = influxDbUtils.pointBuilder(PQS_ONLINERATE, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
}
}

View File

@@ -0,0 +1,32 @@
package com.njcn.executor.pojo.vo;
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import java.time.Instant;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/7/12 9:55
*/
@Data
@Measurement(name = "pqs_communicate")
public class PqsCommunicate {
@Column(name = "time")
private Instant time;
@Column(name = "dev_id")
private String devId;
@Column(name = "description")
private String description;
@Column(name = "type")
private Integer type;
}

View File

@@ -0,0 +1,35 @@
package com.njcn.executor.pojo.vo;
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import java.time.Instant;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/7/12 9:55
*/
@Data
@Measurement(name = "pqs_onlinerate")
public class PqsOnlineRate {
@Column(name = "time")
private Instant time;
@Column(name = "dev_id")
private String devId;
@Column(name = "online_min")
private Integer onlineMin;
@Column(name = "offline_min")
private Integer offlineMin;
@Column(name = "online_rate")
private Double onlineRate;
}