装置在线率调度任务
This commit is contained in:
@@ -219,6 +219,15 @@ public interface LineFeignClient {
|
||||
@PostMapping("getDeviceList")
|
||||
HttpResult<List<String>> getDeviceList();
|
||||
|
||||
/**
|
||||
* 功能描述: 获取指定条件的监测点id(实际装置、投运、稳态或者双系统)
|
||||
* @author xy
|
||||
* @date 2022/7/8 14:24
|
||||
* @return 装置id集合
|
||||
*/
|
||||
@PostMapping("getRunLineIdsList")
|
||||
HttpResult<List<String>> getRunLineIdsList();
|
||||
|
||||
/**
|
||||
* 获取当前状态在线的监测点数量
|
||||
* @param lineIds 监测点集合
|
||||
@@ -239,4 +248,14 @@ public interface LineFeignClient {
|
||||
@PostMapping("getOnOrUnLine")
|
||||
HttpResult<List<String>> getOnOrUnLine(@RequestBody LineBaseQueryParam lineBaseQueryParam);
|
||||
|
||||
/**
|
||||
* 获取监测点设备id
|
||||
* @param lineIds 监测点集合
|
||||
* @return 监测点设备id
|
||||
* @author cdf
|
||||
* @date 2022/8/1
|
||||
*/
|
||||
@PostMapping("getOnLineDevLine")
|
||||
HttpResult<List<OnlineLineDTO>> getOnLineDevLine(@RequestBody List<String> lineIds);
|
||||
|
||||
}
|
||||
|
||||
@@ -168,6 +168,12 @@ public class LineFeignClientFallbackFactory implements FallbackFactory<LineFeign
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResult<List<String>> getRunLineIdsList() {
|
||||
log.error("{}异常,降级处理,异常为:{}", "获取监测点Id集合: ", throwable.toString());
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResult<Integer> getOnLineCount(List<String> lineIds) {
|
||||
log.error("{}异常,降级处理,异常为:{}", "获取在线监测点数量异常: ", throwable.toString());
|
||||
@@ -180,6 +186,12 @@ public class LineFeignClientFallbackFactory implements FallbackFactory<LineFeign
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResult<List<OnlineLineDTO>> getOnLineDevLine(List<String> lineIds) {
|
||||
log.error("{}异常,降级处理,异常为:{}", "获取监测点设备id异常: ", throwable.toString());
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.njcn.device.pojo.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* pqs
|
||||
*
|
||||
* @author cdf
|
||||
* @date 2022/9/23
|
||||
*/
|
||||
@Data
|
||||
public class OnlineLineDTO {
|
||||
|
||||
private String lineId;
|
||||
|
||||
private String devId;
|
||||
}
|
||||
@@ -306,6 +306,14 @@ public class LineController extends BaseController {
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, lineMapper.getDeviceList(), methodDescribe);
|
||||
}
|
||||
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@PostMapping("/getRunLineIdsList")
|
||||
@ApiOperation("获取生成在线率的监测点Id")
|
||||
public HttpResult<List<String>> getRunLineIdsList() {
|
||||
String methodDescribe = getMethodDescribe("getRunLineIdsList");
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, lineMapper.getRunLineIdsList(), methodDescribe);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取当前状态在线的监测点数量
|
||||
@@ -343,6 +351,21 @@ public class LineController extends BaseController {
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS,idsRes, methodDescribe);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取监测点设备id
|
||||
* @param lineIds 监测点集合
|
||||
* @return 监测点设备id
|
||||
* @author cdf
|
||||
* @date 2022/8/1
|
||||
*/
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@PostMapping("/getOnLineDevLine")
|
||||
@ApiOperation("获取当前状态在线的监测点数量")
|
||||
@ApiImplicitParam(name = "lineIds", value = "监测点集合", required = true)
|
||||
public HttpResult<List<OnlineLineDTO>> getOnLineDevLine(@RequestBody List<String> lineIds) {
|
||||
String methodDescribe = getMethodDescribe("getOnLineDevLine");
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, lineMapper.getOnLineDevLine(lineIds), methodDescribe);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -7,13 +7,10 @@ import com.njcn.common.pojo.dto.SimpleDTO;
|
||||
import com.njcn.device.pojo.bo.BaseLineInfo;
|
||||
import com.njcn.device.pojo.bo.DeviceType;
|
||||
import com.njcn.device.pojo.bo.excel.TerminalBaseExcel;
|
||||
import com.njcn.device.pojo.dto.OverLimitLineDTO;
|
||||
import com.njcn.device.pojo.dto.PollutionLineDTO;
|
||||
import com.njcn.device.pojo.dto.WarningSubstationDTO;
|
||||
import com.njcn.device.pojo.dto.*;
|
||||
import com.njcn.device.pojo.param.DeviceInfoParam;
|
||||
import com.njcn.device.pojo.po.*;
|
||||
import com.njcn.device.pojo.vo.*;
|
||||
import com.njcn.device.pojo.dto.PollutionSubstationDTO;
|
||||
import com.njcn.web.pojo.vo.LineDataVO;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
@@ -319,15 +316,32 @@ public interface LineMapper extends BaseMapper<Line> {
|
||||
*/
|
||||
List<String> getDeviceList();
|
||||
|
||||
/**
|
||||
* 获取生成在线率的监测点Id
|
||||
* @return 监测点Id
|
||||
*/
|
||||
List<String> getRunLineIdsList();
|
||||
|
||||
|
||||
/**
|
||||
* 获取当前状态在线的监测点数量
|
||||
* @return Integer 在线监测点数量
|
||||
*/
|
||||
Integer getOnLineCount(@Param("lineIds")List<String> lineIds);
|
||||
|
||||
/**
|
||||
* 获取监测点设备id
|
||||
* @return Integer 监测点设备id
|
||||
*/
|
||||
List<OnlineLineDTO> getOnLineDevLine(@Param("lineIds")List<String> lineIds);
|
||||
|
||||
|
||||
/**
|
||||
* 获取当前状态在线和离线的监测点
|
||||
* @return 在线或离线监测点ids
|
||||
*/
|
||||
List<String> getOnOrUnLine(@Param("list")List<String> lineIds,@Param("comFlag")Integer comFlag);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -902,6 +902,19 @@ FROM
|
||||
SELECT Id FROM pq_device WHERE Dev_Model = 1 AND Dev_Data_Type IN (1,2) AND Run_Flag = 0
|
||||
</select>
|
||||
|
||||
<select id="getRunLineIdsList" resultType="string">
|
||||
select a.id from pq_line a
|
||||
inner join pq_line b on a.pid = b.id
|
||||
inner join pq_line c on b.pid = c.id
|
||||
inner join pq_device d on c.id = d.id
|
||||
where d.Dev_Model = 1
|
||||
AND d.Dev_Data_Type IN (1,2)
|
||||
AND d.Run_Flag = 0
|
||||
and a.level = 6
|
||||
and a.state = 1
|
||||
</select>
|
||||
|
||||
|
||||
|
||||
<select id="getOnLineCount" resultType="int">
|
||||
SELECT
|
||||
@@ -921,6 +934,17 @@ FROM
|
||||
</foreach>
|
||||
</select>
|
||||
|
||||
<select id="getOnLineDevLine" resultType="OnlineLineDTO">
|
||||
select a.id lineId,c.id devId from pq_line a
|
||||
inner join pq_line b on a.pid = b.id
|
||||
inner join pq_line c on b.pid = c.id
|
||||
where a.id in
|
||||
<foreach collection="lineIds" item="item" open="(" close=")" separator=",">
|
||||
#{item}
|
||||
</foreach>
|
||||
</select>
|
||||
|
||||
|
||||
<select id="getOnOrUnLine" resultType="String">
|
||||
select a.id from pq_line a
|
||||
inner join pq_line b on a.pid = b.id
|
||||
|
||||
@@ -0,0 +1,308 @@
|
||||
package com.njcn.executor.handler;
|
||||
|
||||
import com.njcn.common.pojo.constant.PatternRegex;
|
||||
import com.njcn.device.api.LineFeignClient;
|
||||
import com.njcn.device.pojo.dto.OnlineLineDTO;
|
||||
import com.njcn.device.pojo.vo.AreaLineInfoVO;
|
||||
import com.njcn.energy.pojo.constant.ModelState;
|
||||
import com.njcn.executor.pojo.vo.PqsCommunicateClone;
|
||||
import com.njcn.executor.pojo.vo.PqsOnlineRate;
|
||||
import com.njcn.influxdb.utils.InfluxDbUtils;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.dto.BatchPoints;
|
||||
import org.influxdb.dto.Point;
|
||||
import org.influxdb.dto.QueryResult;
|
||||
import org.influxdb.impl.InfluxDBResultMapper;
|
||||
import org.influxdb.querybuilder.SelectQueryImpl;
|
||||
import org.influxdb.querybuilder.WhereNested;
|
||||
import org.influxdb.querybuilder.WhereQueryImpl;
|
||||
import org.influxdb.querybuilder.clauses.Clause;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.njcn.influxdb.param.InfluxDBPublicParam.*;
|
||||
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;
|
||||
|
||||
/**
|
||||
* 类的介绍:复制终端在线率代码 (通讯表录入监测点情况)
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2022/7/8 13:43
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
public class ClonePqsOnlineRateJob {
|
||||
|
||||
private final InfluxDbUtils influxDbUtils;
|
||||
|
||||
private final LineFeignClient lineFeignClient;
|
||||
|
||||
@XxlJob("clonePqsOnlineRateJobHandler")
|
||||
public void clonePqsOnlineRateJobHandler() throws ParseException {
|
||||
List<PqsOnlineRate> result = new ArrayList<>();
|
||||
|
||||
List<String> paramList = new ArrayList<>(),lineList = new ArrayList<>();
|
||||
String command = XxlJobHelper.getJobParam();
|
||||
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH)-1, 0, 0, 0);
|
||||
calendar.set(Calendar.MILLISECOND, 0);
|
||||
|
||||
Calendar calendar2 = Calendar.getInstance();
|
||||
calendar2.set(calendar2.get(Calendar.YEAR), calendar2.get(Calendar.MONTH), calendar2.get(Calendar.DAY_OF_MONTH)-1, 23, 59, 59);
|
||||
calendar2.set(Calendar.MILLISECOND, 0);
|
||||
|
||||
String startTime = format.format(calendar.getTime());
|
||||
String endTime = format.format(calendar2.getTime());
|
||||
|
||||
if (!StringUtils.isEmpty(command)){
|
||||
paramList = Arrays.asList(command.split(","));
|
||||
startTime = paramList.get(0);
|
||||
endTime = paramList.get(1);
|
||||
lineList = paramList.subList(2,paramList.size());
|
||||
boolean s1 = Pattern.matches(PatternRegex.TIME_FORMAT,startTime);
|
||||
boolean e1 = Pattern.matches(PatternRegex.TIME_FORMAT,endTime);
|
||||
if (!s1 || !e1){
|
||||
log.error("补招时间格式错误");
|
||||
return;
|
||||
} else {
|
||||
startTime = startTime + START_TIME;
|
||||
endTime = endTime + END_TIME;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (CollectionUtils.isEmpty(lineList)){
|
||||
lineList = lineFeignClient.getRunLineIdsList().getData();
|
||||
}
|
||||
|
||||
if (!CollectionUtils.isEmpty(lineList)){
|
||||
long diff,diffDays,a,b = 0;
|
||||
List<PqsCommunicateClone> l1 = new ArrayList<>(),l2 = new ArrayList<>();
|
||||
Date d1 = format.parse(startTime);
|
||||
Date d2 = format.parse(endTime);
|
||||
diff = d2.getTime() - d1.getTime();
|
||||
diffDays = diff / (24 * 60 * 60 * 1000-1000);
|
||||
int days = (int) diffDays;
|
||||
for (int i = 1; i <= days; i++) {
|
||||
a = d1.getTime() + (long)(i-1)*(24 * 60 * 60) * 1000;
|
||||
b = d1.getTime() + (long)i*(24 * 60 * 60) * 1000-1000;
|
||||
startTime = format.format(a);
|
||||
endTime = format.format(b);
|
||||
//获取装置的最新的一条数据
|
||||
List<PqsCommunicateClone> latestList = getData(lineList,endTime);
|
||||
if (!CollectionUtils.isEmpty(latestList)){
|
||||
for (PqsCommunicateClone item : latestList) {
|
||||
if (item.getTime().toEpochMilli() < a){
|
||||
l1.add(item);
|
||||
} else if (a <= item.getTime().toEpochMilli() && item.getTime().toEpochMilli() < b){
|
||||
l2.add(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(l1)){
|
||||
for (PqsCommunicateClone item : l1) {
|
||||
PqsOnlineRate onlineRate = new PqsOnlineRate();
|
||||
if (Objects.equals(item.getType(), ModelState.offline)){
|
||||
onlineRate.setOfflineMin(DAY_MINUTE);
|
||||
onlineRate.setOnlineMin(0);
|
||||
onlineRate.setOnlineRate(0.0);
|
||||
} else {
|
||||
onlineRate.setOfflineMin(0);
|
||||
onlineRate.setOnlineMin(DAY_MINUTE);
|
||||
onlineRate.setOnlineRate(100.0);
|
||||
}
|
||||
onlineRate.setTime(Instant.ofEpochMilli(a));
|
||||
onlineRate.setDevId(item.getLineId());
|
||||
result.add(onlineRate);
|
||||
}
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(l2)){
|
||||
List<String> lineIdsList = l2.stream().map(PqsCommunicateClone::getLineId).collect(Collectors.toList());
|
||||
List<PqsCommunicateClone> list = getPqsCommunicateClone(lineIdsList,startTime,endTime);
|
||||
//根据装置的id进行分组
|
||||
Map<String,List<PqsCommunicateClone>> groupMap = list.stream().collect(Collectors.groupingBy(PqsCommunicateClone::getLineId));
|
||||
try {
|
||||
if (!CollectionUtils.isEmpty(groupMap)){
|
||||
for (String key : groupMap.keySet()) {
|
||||
int offTime = 0;
|
||||
int onTime = 0;
|
||||
PqsOnlineRate onlineRate = new PqsOnlineRate();
|
||||
List<PqsCommunicateClone> infoList = groupMap.get(key);
|
||||
if (infoList.size() > 1){
|
||||
//获取最早一条记录
|
||||
PqsCommunicateClone first = infoList.stream().min(Comparator.comparing(PqsCommunicateClone::getTime)).get();
|
||||
//将上线和下线分组
|
||||
Map<Integer,List<PqsCommunicateClone>> typeMap = infoList.stream().collect(Collectors.groupingBy(PqsCommunicateClone::getType));
|
||||
List<PqsCommunicateClone> off = typeMap.get(0);
|
||||
List<PqsCommunicateClone> on = typeMap.get(1);
|
||||
if (first.getType() == 0){
|
||||
if (off.size() == on.size()){
|
||||
for (int j = 0; j < off.size(); j++) {
|
||||
offTime = offTime + (int) (on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
}
|
||||
} else {
|
||||
for (int j = 0; j < off.size(); j++) {
|
||||
if (j == off.size() - 1){
|
||||
offTime = offTime + (int) (format.parse(endTime).getTime() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
} else {
|
||||
offTime = offTime + (int) (on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
}
|
||||
}
|
||||
}
|
||||
onTime = DAY_MINUTE-offTime;
|
||||
} else {
|
||||
if (off.size() == on.size()){
|
||||
for (int j = 0; j < on.size(); j++) {
|
||||
onTime = onTime + (int) (off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
}
|
||||
} else {
|
||||
for (int j = 0; j < on.size(); j++) {
|
||||
if (j == on.size() - 1){
|
||||
onTime = onTime + (int) (format.parse(endTime).getTime() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
} else {
|
||||
onTime = onTime + (int) (off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
}
|
||||
}
|
||||
}
|
||||
offTime = DAY_MINUTE-onTime;
|
||||
}
|
||||
} else {
|
||||
LocalDateTime updateTime = LocalDateTime.ofInstant(infoList.get(0).getTime(), ZoneId.systemDefault());
|
||||
if (Objects.equals(infoList.get(0).getType(),0)) {
|
||||
onTime = (int) (updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - a) / (1000*60);
|
||||
offTime = (int) (format.parse(endTime).getTime() + 1000L - updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
} else {
|
||||
//如果只有一条且当天在线说明在此时刻钱该装置都掉线
|
||||
offTime =(int) (updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - a) / (1000*60);
|
||||
onTime = (int) (format.parse(endTime).getTime() + 1000L - updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60);
|
||||
}
|
||||
}
|
||||
onlineRate.setDevId(infoList.get(0).getLineId());
|
||||
onlineRate.setOnlineMin(onTime);
|
||||
onlineRate.setOfflineMin(offTime);
|
||||
onlineRate.setOnlineRate(Double.parseDouble(String.format("%.2f",onTime*1.0/DAY_MINUTE*100)));
|
||||
onlineRate.setTime(Instant.ofEpochMilli(a));
|
||||
result.add(onlineRate);
|
||||
}
|
||||
}
|
||||
} catch (ParseException e) {
|
||||
e.getMessage();
|
||||
log.error("终端在线率调度任务执行出错,错误信息:"+e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//获取所有监测点装置id
|
||||
List<OnlineLineDTO> lineBaseList = lineFeignClient.getOnLineDevLine(result.stream().map(PqsOnlineRate::getDevId).collect(Collectors.toList())).getData();
|
||||
|
||||
Map<String, List<OnlineLineDTO>> hashMap = lineBaseList.stream().collect(Collectors.groupingBy(OnlineLineDTO::getLineId));
|
||||
|
||||
result = result.stream().peek(item->item.setDevId(hashMap.get(item.getDevId()).get(0).getDevId())).collect(Collectors.toList());
|
||||
|
||||
/*增加相同装置过滤*/
|
||||
List<PqsOnlineRate> list = result.stream().collect(Collectors
|
||||
.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>
|
||||
(Comparator.comparing(PqsOnlineRate::getDevId))), ArrayList::new));
|
||||
|
||||
insertData(list);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取pqs_communicate数据
|
||||
* @param list 装置集合
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
* @return pqs_communicate数据
|
||||
*/
|
||||
private List<PqsCommunicateClone> getPqsCommunicateClone(List<String> list, String startTime, String endTime){
|
||||
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
|
||||
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
|
||||
whereAndNested(list, where);
|
||||
|
||||
where.and(gte(TIME, startTime)).and(lte(TIME, endTime));
|
||||
where.tz(TZ);
|
||||
QueryResult queryResult = influxDbUtils.query(selectQuery.getCommand());
|
||||
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
|
||||
return resultMapper.toPOJO(queryResult, PqsCommunicateClone.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 拼接装置条件
|
||||
* @param list 装置集合
|
||||
* @param whereQuery WhereQueryImpl
|
||||
*/
|
||||
private void whereAndNested(List<String> list, WhereQueryImpl<SelectQueryImpl> whereQuery) {
|
||||
List<Clause> clauses = new ArrayList<>();
|
||||
list.forEach(item->{
|
||||
Clause clause = eq(LINE_ID, item);
|
||||
clauses.add(clause);
|
||||
});
|
||||
WhereNested<WhereQueryImpl<SelectQueryImpl>> andNested = whereQuery.andNested();
|
||||
for (Clause clause : clauses) {
|
||||
andNested.or(clause);
|
||||
}
|
||||
andNested.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取pqs_communicate数据最新一条数据
|
||||
* @param list 装置id集合
|
||||
* @return pqs_communicate数据
|
||||
*/
|
||||
private List<PqsCommunicateClone> getData(List<String> list,String endTime){
|
||||
SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE);
|
||||
WhereQueryImpl<SelectQueryImpl> where = selectQuery.where();
|
||||
whereAndNested(list, where);
|
||||
where.and(lte(TIME,endTime));
|
||||
where.groupBy(LINE_ID).orderBy(desc()).limit(1);
|
||||
where.tz(TZ);
|
||||
String sql = selectQuery.getCommand();
|
||||
QueryResult queryResult = influxDbUtils.query(sql);
|
||||
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
|
||||
return resultMapper.toPOJO(queryResult, PqsCommunicateClone.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 功能描述:插入pqs_integrity表数据
|
||||
* @author xy
|
||||
* @param list 数据集合
|
||||
* @date 2022/5/12 8:55
|
||||
*/
|
||||
private void insertData(List<PqsOnlineRate> list){
|
||||
List<String> records = new ArrayList<>();
|
||||
list.forEach(item->{
|
||||
Map<String, String> tags = new HashMap<>();
|
||||
Map<String, Object> fields = new HashMap<>();
|
||||
tags.put(DEV_ID,item.getDevId());
|
||||
fields.put(ONLINE_MIN,item.getOnlineMin());
|
||||
fields.put(OFFLINE_MIN,item.getOfflineMin());
|
||||
fields.put(ONLINE_RATE,item.getOnlineRate());
|
||||
Point point = influxDbUtils.pointBuilder(PQS_ONLINERATE, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields);
|
||||
BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
|
||||
batchPoints.point(point);
|
||||
records.add(batchPoints.lineProtocol());
|
||||
});
|
||||
influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.njcn.executor.pojo.vo;
|
||||
|
||||
import lombok.Data;
|
||||
import org.influxdb.annotation.Column;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2022/7/12 9:55
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "pqs_communicate")
|
||||
public class PqsCommunicateClone {
|
||||
|
||||
@Column(name = "time")
|
||||
private Instant time;
|
||||
|
||||
@Column(name = "line_id")
|
||||
private String lineId;
|
||||
|
||||
@Column(name = "description")
|
||||
private String description;
|
||||
|
||||
@Column(name = "type")
|
||||
private Integer type;
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user