|
|
|
|
@@ -1,32 +1,33 @@
|
|
|
|
|
package com.njcn.prepare.harmonic.service.mysql.Impl.line;
|
|
|
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil;
|
|
|
|
|
import cn.hutool.core.date.DatePattern;
|
|
|
|
|
import cn.hutool.core.date.LocalDateTimeUtil;
|
|
|
|
|
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
|
|
|
|
|
import com.njcn.common.pojo.enums.common.ServerEnum;
|
|
|
|
|
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
|
|
|
|
|
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
|
|
|
|
|
import com.njcn.device.biz.pojo.param.DeptGetLineParam;
|
|
|
|
|
import com.njcn.device.pq.api.LineFeignClient;
|
|
|
|
|
import com.njcn.device.pq.pojo.po.RStatIntegrityD;
|
|
|
|
|
import com.njcn.influx.deprecated.InfluxDBPublicParam;
|
|
|
|
|
import com.njcn.influx.constant.InfluxDbSqlConstant;
|
|
|
|
|
import com.njcn.influx.imapper.DataVMapper;
|
|
|
|
|
import com.njcn.influx.pojo.bo.MeasurementCount;
|
|
|
|
|
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
|
|
|
|
|
import com.njcn.influx.pojo.po.DataV;
|
|
|
|
|
import com.njcn.influx.query.InfluxQueryWrapper;
|
|
|
|
|
import com.njcn.influx.utils.InfluxDbUtils;
|
|
|
|
|
import com.njcn.prepare.bo.CalculatedParam;
|
|
|
|
|
import com.njcn.prepare.harmonic.mapper.mysql.day.RStatIntegrityDMapper;
|
|
|
|
|
import com.njcn.prepare.harmonic.pojo.param.LineParam;
|
|
|
|
|
import com.njcn.prepare.harmonic.service.mysql.line.IntegrityService;
|
|
|
|
|
import com.njcn.user.api.DeptFeignClient;
|
|
|
|
|
import com.njcn.user.pojo.po.Dept;
|
|
|
|
|
import lombok.AllArgsConstructor;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.influxdb.dto.QueryResult;
|
|
|
|
|
import org.influxdb.impl.InfluxDBResultMapper;
|
|
|
|
|
import org.apache.commons.collections4.ListUtils;
|
|
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@@ -50,80 +51,109 @@ public class IntegrityServiceImpl extends MppServiceImpl<RStatIntegrityDMapper,
|
|
|
|
|
|
|
|
|
|
private final CommTerminalGeneralClient commTerminalGeneralClient;
|
|
|
|
|
|
|
|
|
|
/*@Override
|
|
|
|
|
@Async("asyncExecutor")
|
|
|
|
|
public String computeDataIntegrity(LineParam lineParam) {
|
|
|
|
|
List<LineDetail> lineDetailList;
|
|
|
|
|
if (CollUtil.isEmpty(lineParam.getLineIds())){
|
|
|
|
|
List<Overlimit> overLimitList = getAllLinesLimitData();
|
|
|
|
|
List<String> lineList = overLimitList.stream().map(Overlimit::getId).collect(Collectors.toList());
|
|
|
|
|
lineDetailList = lineFeignClient.getLineDetail(lineList).getData();
|
|
|
|
|
}else {
|
|
|
|
|
lineDetailList = lineFeignClient.getLineDetail(lineParam.getLineIds()).getData();
|
|
|
|
|
}
|
|
|
|
|
if (CollUtil.isEmpty(lineDetailList)){
|
|
|
|
|
return "未查询到监测点详情!";
|
|
|
|
|
}
|
|
|
|
|
Date dateOut = DateUtil.parse(lineParam.getDataDate());
|
|
|
|
|
List<String> records = new ArrayList<>();
|
|
|
|
|
for (LineDetail lineDetail :lineDetailList){
|
|
|
|
|
Map<String, String> tags = new HashMap<>();
|
|
|
|
|
Map<String, Object> fields = new HashMap<>();
|
|
|
|
|
tags.put("line_id",lineDetail.getId());
|
|
|
|
|
fields.put("due",DAY_MINUTE/lineDetail.getTimeInterval());
|
|
|
|
|
int dataCount = getDataCount(lineDetail.getId(),lineParam.getDataDate());
|
|
|
|
|
fields.put("real",dataCount);
|
|
|
|
|
Point point = influxDbUtils.pointBuilder("pqs_integrity", dateOut.getTime(), TimeUnit.MILLISECONDS,tags, fields);
|
|
|
|
|
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).tag("line_id", lineDetail.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
|
|
|
|
|
batchPoints.point(point);
|
|
|
|
|
records.add(batchPoints.lineProtocol());
|
|
|
|
|
}
|
|
|
|
|
//InfluxDb入表pqs_integrity
|
|
|
|
|
influxDbUtils.batchInsert(influxDbUtils.getDbName(),"", InfluxDB.ConsistencyLevel.ALL, records);
|
|
|
|
|
return "成功!";
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
private final DataVMapper dataVMapper;
|
|
|
|
|
|
|
|
|
|
// @Override
|
|
|
|
|
// @Async("asyncExecutor")
|
|
|
|
|
// @Deprecated
|
|
|
|
|
// public void dataIntegrity(LineParam lineParam,String startTime,String endTime) {
|
|
|
|
|
// DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
// LocalDateTime dateTime = LocalDateTime.parse(startTime,df);
|
|
|
|
|
//
|
|
|
|
|
// List<LineDevGetDTO> lineDevGetDTOList = new ArrayList<>();
|
|
|
|
|
// if (CollUtil.isEmpty(lineParam.getLineIds())){
|
|
|
|
|
// Dept dept = deptFeignClient.getRootDept().getData();
|
|
|
|
|
//
|
|
|
|
|
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
|
|
|
|
|
// deptGetLineParam.setDeptId(dept.getId());
|
|
|
|
|
// deptGetLineParam.setServerName(ServerEnum.HARMONIC.getName());
|
|
|
|
|
// List<String> monitorIds = commTerminalGeneralClient.getRunMonitorIds().getData();
|
|
|
|
|
// lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(monitorIds).getData();
|
|
|
|
|
// }else {
|
|
|
|
|
// lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(lineParam.getLineIds()).getData();
|
|
|
|
|
// }
|
|
|
|
|
// List<RStatIntegrityD> list = new ArrayList<>();
|
|
|
|
|
// for (LineDevGetDTO lineDetail :lineDevGetDTOList){
|
|
|
|
|
// int dataCount = getDataCount(lineDetail.getPointId(),startTime,endTime);
|
|
|
|
|
// RStatIntegrityD integrityDpo = new RStatIntegrityD();
|
|
|
|
|
// integrityDpo.setTimeId(dateTime);
|
|
|
|
|
// integrityDpo.setLineIndex(lineDetail.getPointId());
|
|
|
|
|
// integrityDpo.setDueTime(InfluxDBPublicParam.DAY_MINUTE/lineDetail.getInterval());
|
|
|
|
|
// integrityDpo.setRealTime(dataCount);
|
|
|
|
|
// list.add(integrityDpo);
|
|
|
|
|
// }
|
|
|
|
|
// this.saveOrUpdateBatchByMultiId(list,500);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
/********************************新算法************************************************/
|
|
|
|
|
@Override
|
|
|
|
|
@Async("asyncExecutor")
|
|
|
|
|
public void dataIntegrity(LineParam lineParam,String startTime,String endTime) {
|
|
|
|
|
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
LocalDateTime dateTime = LocalDateTime.parse(startTime,df);
|
|
|
|
|
|
|
|
|
|
List<LineDevGetDTO> lineDevGetDTOList = new ArrayList<>();
|
|
|
|
|
if (CollUtil.isEmpty(lineParam.getLineIds())){
|
|
|
|
|
Dept dept = deptFeignClient.getRootDept().getData();
|
|
|
|
|
|
|
|
|
|
DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
|
|
|
|
|
deptGetLineParam.setDeptId(dept.getId());
|
|
|
|
|
deptGetLineParam.setServerName(ServerEnum.HARMONIC.getName());
|
|
|
|
|
List<String> monitorIds = commTerminalGeneralClient.getRunMonitorIds().getData();
|
|
|
|
|
lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(monitorIds).getData();
|
|
|
|
|
}else {
|
|
|
|
|
lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(lineParam.getLineIds()).getData();
|
|
|
|
|
}
|
|
|
|
|
public void dataIntegrity(CalculatedParam calculatedParam) {
|
|
|
|
|
List<RStatIntegrityD> list = new ArrayList<>();
|
|
|
|
|
for (LineDevGetDTO lineDetail :lineDevGetDTOList){
|
|
|
|
|
int dataCount = getDataCount(lineDetail.getPointId(),startTime,endTime);
|
|
|
|
|
RStatIntegrityD integrityDpo = new RStatIntegrityD();
|
|
|
|
|
integrityDpo.setTimeId(dateTime);
|
|
|
|
|
integrityDpo.setLineIndex(lineDetail.getPointId());
|
|
|
|
|
integrityDpo.setDueTime(InfluxDBPublicParam.DAY_MINUTE/lineDetail.getInterval());
|
|
|
|
|
integrityDpo.setRealTime(dataCount);
|
|
|
|
|
list.add(integrityDpo);
|
|
|
|
|
List<String> lineIds = calculatedParam.getIdList();
|
|
|
|
|
String beginDay = LocalDateTimeUtil.format(
|
|
|
|
|
LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN)),
|
|
|
|
|
DatePattern.NORM_DATETIME_PATTERN
|
|
|
|
|
);
|
|
|
|
|
String endDay = LocalDateTimeUtil.format(
|
|
|
|
|
LocalDateTimeUtil.endOfDay(LocalDateTimeUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN)),
|
|
|
|
|
DatePattern.NORM_DATETIME_PATTERN
|
|
|
|
|
);
|
|
|
|
|
//以尺寸100分片
|
|
|
|
|
List<List<String>> pendingIds = ListUtils.partition(lineIds,100);
|
|
|
|
|
for (List<String> pendingId : pendingIds) {
|
|
|
|
|
List<LineDevGetDTO> lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(pendingId).getData();
|
|
|
|
|
List<MeasurementCount> countList = this.getMeasurementCount(pendingId,beginDay,endDay);
|
|
|
|
|
list.addAll(
|
|
|
|
|
lineDevGetDTOList.stream()
|
|
|
|
|
.map(item -> {
|
|
|
|
|
RStatIntegrityD integrityDpo = new RStatIntegrityD();
|
|
|
|
|
integrityDpo.setTimeId(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN));
|
|
|
|
|
integrityDpo.setLineIndex(item.getPointId());
|
|
|
|
|
integrityDpo.setDueTime(InfluxDBTableConstant.DAY_MINUTE / item.getInterval());
|
|
|
|
|
integrityDpo.setRealTime(countList.stream()
|
|
|
|
|
.filter(item2 -> Objects.equals(item.getPointId(), item2.getLineId()))
|
|
|
|
|
.map(item2 -> (int) Double.parseDouble(item2.getFreq()))
|
|
|
|
|
.findFirst().orElse(0)
|
|
|
|
|
);
|
|
|
|
|
return integrityDpo;
|
|
|
|
|
})
|
|
|
|
|
.collect(Collectors.toList())
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
this.saveOrUpdateBatchByMultiId(list,500);
|
|
|
|
|
this.saveOrUpdateBatchByMultiId(list,1000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private int getDataCount(String lineId,String startTime,String endTime){
|
|
|
|
|
QueryResult sqlResult = influxDbUtils.query("SELECT * FROM data_v WHERE time >= '" + startTime + "' and time <= '" + endTime + "' and line_id = '" + lineId + "' and phasic_type = 'T' and value_type = 'MAX' tz('Asia/Shanghai')");
|
|
|
|
|
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
|
|
|
|
|
List<DataV> list = resultMapper.toPOJO(sqlResult, DataV.class);
|
|
|
|
|
if (CollectionUtils.isEmpty(list)){
|
|
|
|
|
return 0;
|
|
|
|
|
} else {
|
|
|
|
|
return list.size();
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* 获取data_v中各个监测点的数据总数
|
|
|
|
|
* @param lineIndex
|
|
|
|
|
* @param startTime
|
|
|
|
|
* @param endTime
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
public List<MeasurementCount> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) {
|
|
|
|
|
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class,MeasurementCount.class);
|
|
|
|
|
influxQueryWrapper.regular(DataV::getLineId, lineIndex)
|
|
|
|
|
.eq(DataV::getValueType, InfluxDbSqlConstant.MAX)
|
|
|
|
|
.eq(DataV::getPhasicType, InfluxDBTableConstant.PHASE_TYPE_T)
|
|
|
|
|
.count(DataV::getFreq)
|
|
|
|
|
.groupBy(DataV::getLineId)
|
|
|
|
|
.between(DataV::getTime, startTime, endTime);
|
|
|
|
|
return dataVMapper.getMeasurementCount(influxQueryWrapper);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/********************************新算法结束************************************************/
|
|
|
|
|
|
|
|
|
|
// private int getDataCount(String lineId,String startTime,String endTime){
|
|
|
|
|
// QueryResult sqlResult = influxDbUtils.query("SELECT * FROM data_v WHERE time >= '" + startTime + "' and time <= '" + endTime + "' and line_id = '" + lineId + "' and phasic_type = 'T' and value_type = 'MAX' tz('Asia/Shanghai')");
|
|
|
|
|
// InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
|
|
|
|
|
// List<DataV> list = resultMapper.toPOJO(sqlResult, DataV.class);
|
|
|
|
|
// if (CollectionUtils.isEmpty(list)){
|
|
|
|
|
// return 0;
|
|
|
|
|
// } else {
|
|
|
|
|
// return list.size();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|