数据完整性算法调整

This commit is contained in:
2025-04-01 09:58:46 +08:00
parent 21df04eb6d
commit 42ce80478c
9 changed files with 47 additions and 16 deletions

View File

@@ -67,9 +67,14 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService {
); );
//以尺寸100分片 //以尺寸100分片
List<List<String>> pendingIds = ListUtils.partition(lineIds, NUM); List<List<String>> pendingIds = ListUtils.partition(lineIds, NUM);
LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam();
lineCountEvaluateParam.setStartTime(beginDay);
lineCountEvaluateParam.setEndTime(endDay);
for (List<String> pendingId : pendingIds) { for (List<String> pendingId : pendingIds) {
lineCountEvaluateParam.setLineId(pendingId);
List<LineDevGetDTO> lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(pendingId).getData(); List<LineDevGetDTO> lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(pendingId).getData();
List<MeasurementCountDTO> countList = dataVFeignClient.getMeasurementCount(pendingId, beginDay, endDay).getData(); List<MeasurementCountDTO.MeasurementCountResultDTO> countList = dataVFeignClient.getMeasurementCount(lineCountEvaluateParam).getData();
poList.addAll( poList.addAll(
lineDevGetDTOList.stream() lineDevGetDTOList.stream()
.map(item -> { .map(item -> {

View File

@@ -58,7 +58,7 @@ public interface DataVFeignClient {
HttpResult<String> batchInsertionCvtDTO(@RequestBody List<DataVCvtDto> cvtDTOList); HttpResult<String> batchInsertionCvtDTO(@RequestBody List<DataVCvtDto> cvtDTOList);
@PostMapping("/getMeasurementCount") @PostMapping("/getMeasurementCount")
HttpResult<List<MeasurementCountDTO>> getMeasurementCount(@RequestParam("lineIndex")List<String> lineIndex, @RequestParam("startTime")String startTime, @RequestParam("endTime")String endTime); HttpResult<List<MeasurementCountDTO.MeasurementCountResultDTO>> getMeasurementCount(@RequestBody LineCountEvaluateParam lineCountEvaluateParam);
//获取原始数据 //获取原始数据
@PostMapping("/getDataV") @PostMapping("/getDataV")

View File

@@ -100,8 +100,8 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory<DataVFei
} }
@Override @Override
public HttpResult<List<MeasurementCountDTO>> getMeasurementCount(List<String> lineIndex,String startTime,String endTime){ public HttpResult<List<MeasurementCountDTO.MeasurementCountResultDTO>> getMeasurementCount(LineCountEvaluateParam lineCountEvaluateParam){
log.error("{}异常,降级处理,异常为:{}","cvt数据插入DataV",cause.toString()); log.error("{}异常,降级处理,异常为:{}","获取完整性数据",cause.toString());
throw new BusinessException(finalExceptionEnum); throw new BusinessException(finalExceptionEnum);
} }

View File

@@ -17,13 +17,24 @@ import java.time.Instant;
@Measurement(name = "data_v") @Measurement(name = "data_v")
public class MeasurementCountDTO { public class MeasurementCountDTO {
@Column(name = "time") @Column(name = "time",tag = true)
@JsonSerialize(using = InstantDateSerializer.class) @JsonSerialize(using = InstantDateSerializer.class)
private Instant time; private Instant time;
@Column(name = "line_id") @Column(name = "line_id",tag = true)
private String lineId; private String lineId;
@Column(name = "freq") @Column(name = "freq")
private String freq; private String freq;
@Data
public static class MeasurementCountResultDTO{
private String time;
private String lineId;
private String freq;
}
} }

View File

@@ -147,9 +147,9 @@ public class DataVController extends BaseController {
@OperateInfo(info = LogEnum.BUSINESS_COMMON) @OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/getMeasurementCount") @PostMapping("/getMeasurementCount")
@ApiOperation("获取算法基础数据") @ApiOperation("获取算法基础数据")
public HttpResult<List<MeasurementCountDTO>> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) { public HttpResult<List<MeasurementCountDTO.MeasurementCountResultDTO>> getMeasurementCount(@RequestBody LineCountEvaluateParam lineCountEvaluateParam) {
String methodDescribe = getMethodDescribe("getMeasurementCount"); String methodDescribe = getMethodDescribe("getMeasurementCount");
List<MeasurementCountDTO> data = dataVQuery.getMeasurementCount(lineIndex,startTime,endTime); List<MeasurementCountDTO.MeasurementCountResultDTO> data = dataVQuery.getMeasurementCount(lineCountEvaluateParam.getLineId(),lineCountEvaluateParam.getStartTime(),lineCountEvaluateParam.getEndTime());
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe);
} }

View File

@@ -69,7 +69,7 @@ public interface IDataV extends IMppService<RStatDataVD> {
void batchInsertionCvtDTO(List<DataVCvtDto> cvtDTOList); void batchInsertionCvtDTO(List<DataVCvtDto> cvtDTOList);
List<MeasurementCountDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime); List<MeasurementCountDTO.MeasurementCountResultDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime);
/** /**
* 获取原始数据 * 获取原始数据

View File

@@ -353,8 +353,9 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
} }
@Override @Override
public List<MeasurementCountDTO> getMeasurementCount(List<String> lineIndex, String startTime, String public List<MeasurementCountDTO.MeasurementCountResultDTO> getMeasurementCount(List<String> lineIndex, String startTime, String
endTime) { endTime) {
List<MeasurementCountDTO.MeasurementCountResultDTO> result = new ArrayList<>();
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class, MeasurementCountDTO.class); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class, MeasurementCountDTO.class);
influxQueryWrapper.regular(DataV::getLineId, lineIndex) influxQueryWrapper.regular(DataV::getLineId, lineIndex)
.eq(DataV::getValueType, InfluxDbSqlConstant.MAX) .eq(DataV::getValueType, InfluxDbSqlConstant.MAX)
@@ -362,7 +363,16 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
.count(DataV::getFreq) .count(DataV::getFreq)
.groupBy(DataV::getLineId) .groupBy(DataV::getLineId)
.between(DataV::getTime, startTime, endTime); .between(DataV::getTime, startTime, endTime);
return dataVMapper.getMeasurementCount(influxQueryWrapper); List<MeasurementCountDTO> measurementCountDTOList = dataVMapper.getMeasurementCount(influxQueryWrapper);
if(CollUtil.isNotEmpty(measurementCountDTOList)){
for(MeasurementCountDTO m : measurementCountDTOList){
MeasurementCountDTO.MeasurementCountResultDTO p = new MeasurementCountDTO.MeasurementCountResultDTO();
p.setFreq(m.getFreq());
p.setLineId(m.getLineId());
result.add(p);
}
}
return result;
} }
@Override @Override

View File

@@ -1,6 +1,7 @@
package com.njcn.dataProcess.service.impl.relation; package com.njcn.dataProcess.service.impl.relation;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.dataProcess.dao.relation.mapper.PqReasonableRangeMapper; import com.njcn.dataProcess.dao.relation.mapper.PqReasonableRangeMapper;
@@ -29,10 +30,14 @@ public class PqReasonableRangeServiceImpl extends ServiceImpl<PqReasonableRangeM
public List<PqReasonableRangeDto> getReasonableRangeList(DataCleanParam param) { public List<PqReasonableRangeDto> getReasonableRangeList(DataCleanParam param) {
List<PqReasonableRangeDto> result = new ArrayList<>(); List<PqReasonableRangeDto> result = new ArrayList<>();
LambdaQueryWrapper<PqReasonableRange> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<PqReasonableRange> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(PqReasonableRange::getBelongingSystem,param.getSystemType()) queryWrapper.eq(PqReasonableRange::getBelongingSystem,param.getSystemType());
.eq(PqReasonableRange::getDataSource,param.getDataSource()) if(StrUtil.isNotBlank(param.getDataSource())){
.eq(PqReasonableRange::getInfluxdbTableName,param.getTableName()) queryWrapper.eq(PqReasonableRange::getDataSource,param.getDataSource());
.eq(PqReasonableRange::getState,1); }
if(StrUtil.isNotBlank(param.getTableName())){
queryWrapper.eq(PqReasonableRange::getInfluxdbTableName,param.getTableName());
}
queryWrapper.eq(PqReasonableRange::getState,1);
List<PqReasonableRange> list = this.list(queryWrapper); List<PqReasonableRange> list = this.list(queryWrapper);
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
list.forEach(item->{ list.forEach(item->{

View File

@@ -134,7 +134,7 @@ public class RelationDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
} }
@Override @Override
public List<MeasurementCountDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) { public List<MeasurementCountDTO.MeasurementCountResultDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) {
return Collections.emptyList(); return Collections.emptyList();
} }