数据完整性部分代码提交
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
package com.njcn.algorithm.service.line;
|
||||
|
||||
|
||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||
|
||||
/**
|
||||
* @author xiaoyao
|
||||
* @version 1.0.0
|
||||
* @createTime 2022/10/24 20:06
|
||||
*/
|
||||
public interface IDataIntegrityService {
|
||||
|
||||
/***
|
||||
* 监测点数据完整性_日表
|
||||
* @author xuyang
|
||||
* @date 2023/11/09 10:08
|
||||
* @param calculatedParam 查询条件
|
||||
*/
|
||||
void dataIntegrity(CalculatedParam<String> calculatedParam);
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
package com.njcn.algorithm.serviceimpl.line;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import com.njcn.algorithm.pojo.bo.CalculatedParam;
|
||||
import com.njcn.algorithm.service.line.IDataIntegrityService;
|
||||
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
|
||||
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
|
||||
import com.njcn.device.pq.pojo.po.RStatIntegrityD;
|
||||
import com.njcn.influx.constant.InfluxDbSqlConstant;
|
||||
import com.njcn.influx.pojo.bo.MeasurementCount;
|
||||
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
|
||||
import com.njcn.influx.pojo.po.DataV;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @Author: cdf
|
||||
* @CreateTime: 2025-02-28
|
||||
* @Description: 数据完成性
|
||||
*/
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class IDataIntegrityServiceImpl implements IDataIntegrityService {
|
||||
|
||||
private final CommTerminalGeneralClient commTerminalGeneralClient;
|
||||
|
||||
|
||||
@Override
|
||||
public void dataIntegrity(CalculatedParam<String> calculatedParam) {
|
||||
List<RStatIntegrityD> poList = new ArrayList<>();
|
||||
List<String> lineIds = calculatedParam.getIdList();
|
||||
String beginDay = LocalDateTimeUtil.format(
|
||||
LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN)),
|
||||
DatePattern.NORM_DATETIME_PATTERN
|
||||
);
|
||||
String endDay = LocalDateTimeUtil.format(
|
||||
LocalDateTimeUtil.endOfDay(LocalDateTimeUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN)),
|
||||
DatePattern.NORM_DATETIME_PATTERN
|
||||
);
|
||||
//以尺寸100分片
|
||||
List<List<String>> pendingIds = ListUtils.partition(lineIds,5);
|
||||
for (List<String> pendingId : pendingIds) {
|
||||
List<LineDevGetDTO> lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(pendingId).getData();
|
||||
List<MeasurementCount> countList = this.getMeasurementCount(pendingId,beginDay,endDay);
|
||||
poList.addAll(
|
||||
lineDevGetDTOList.stream()
|
||||
.map(item -> {
|
||||
RStatIntegrityD integrityDpo = new RStatIntegrityD();
|
||||
integrityDpo.setTimeId(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate(), DatePattern.NORM_DATE_PATTERN));
|
||||
integrityDpo.setLineIndex(item.getPointId());
|
||||
integrityDpo.setDueTime(InfluxDBTableConstant.DAY_MINUTE / item.getInterval());
|
||||
integrityDpo.setRealTime(countList.stream()
|
||||
.filter(item2 -> Objects.equals(item.getPointId(), item2.getLineId()))
|
||||
.map(item2 -> (int) Double.parseDouble(item2.getFreq()))
|
||||
.findFirst().orElse(0)
|
||||
);
|
||||
return integrityDpo;
|
||||
})
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取data_v中各个监测点的数据总数
|
||||
* @param lineIndex 监测点索引
|
||||
* @param startTime 起始时间
|
||||
* @param endTime 结束时间
|
||||
*/
|
||||
public List<MeasurementCount> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) {
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class,MeasurementCount.class);
|
||||
influxQueryWrapper.regular(DataV::getLineId, lineIndex)
|
||||
.eq(DataV::getValueType, InfluxDbSqlConstant.MAX)
|
||||
.eq(DataV::getPhaseType, InfluxDBTableConstant.PHASE_TYPE_A)
|
||||
.count(DataV::getFreq)
|
||||
.groupBy(DataV::getLineId)
|
||||
.between(DataV::getTime, startTime, endTime);
|
||||
//return dataVMapper.getMeasurementCount(influxQueryWrapper);
|
||||
|
||||
//TODO 调用插入数据库
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user