Merge remote-tracking branch 'origin/master'
This commit is contained in:
@@ -67,9 +67,14 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService {
|
||||
);
|
||||
//以尺寸100分片
|
||||
List<List<String>> pendingIds = ListUtils.partition(lineIds, NUM);
|
||||
|
||||
LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam();
|
||||
lineCountEvaluateParam.setStartTime(beginDay);
|
||||
lineCountEvaluateParam.setEndTime(endDay);
|
||||
for (List<String> pendingId : pendingIds) {
|
||||
lineCountEvaluateParam.setLineId(pendingId);
|
||||
List<LineDevGetDTO> lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(pendingId).getData();
|
||||
List<MeasurementCountDTO> countList = dataVFeignClient.getMeasurementCount(pendingId, beginDay, endDay).getData();
|
||||
List<MeasurementCountDTO.MeasurementCountResultDTO> countList = dataVFeignClient.getMeasurementCount(lineCountEvaluateParam).getData();
|
||||
poList.addAll(
|
||||
lineDevGetDTOList.stream()
|
||||
.map(item -> {
|
||||
|
||||
@@ -58,7 +58,7 @@ public interface DataVFeignClient {
|
||||
HttpResult<String> batchInsertionCvtDTO(@RequestBody List<DataVCvtDto> cvtDTOList);
|
||||
|
||||
@PostMapping("/getMeasurementCount")
|
||||
HttpResult<List<MeasurementCountDTO>> getMeasurementCount(@RequestParam("lineIndex")List<String> lineIndex, @RequestParam("startTime")String startTime, @RequestParam("endTime")String endTime);
|
||||
HttpResult<List<MeasurementCountDTO.MeasurementCountResultDTO>> getMeasurementCount(@RequestBody LineCountEvaluateParam lineCountEvaluateParam);
|
||||
|
||||
//获取原始数据
|
||||
@PostMapping("/getDataV")
|
||||
|
||||
@@ -100,8 +100,8 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory<DataVFei
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResult<List<MeasurementCountDTO>> getMeasurementCount(List<String> lineIndex,String startTime,String endTime){
|
||||
log.error("{}异常,降级处理,异常为:{}","cvt数据插入DataV",cause.toString());
|
||||
public HttpResult<List<MeasurementCountDTO.MeasurementCountResultDTO>> getMeasurementCount(LineCountEvaluateParam lineCountEvaluateParam){
|
||||
log.error("{}异常,降级处理,异常为:{}","获取完整性数据",cause.toString());
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
|
||||
|
||||
@@ -17,13 +17,24 @@ import java.time.Instant;
|
||||
@Measurement(name = "data_v")
|
||||
public class MeasurementCountDTO {
|
||||
|
||||
@Column(name = "time")
|
||||
@Column(name = "time",tag = true)
|
||||
@JsonSerialize(using = InstantDateSerializer.class)
|
||||
private Instant time;
|
||||
|
||||
@Column(name = "line_id")
|
||||
@Column(name = "line_id",tag = true)
|
||||
private String lineId;
|
||||
|
||||
@Column(name = "freq")
|
||||
private String freq;
|
||||
|
||||
|
||||
@Data
|
||||
public static class MeasurementCountResultDTO{
|
||||
|
||||
private String time;
|
||||
|
||||
private String lineId;
|
||||
|
||||
private String freq;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,9 +147,9 @@ public class DataVController extends BaseController {
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@PostMapping("/getMeasurementCount")
|
||||
@ApiOperation("获取算法基础数据")
|
||||
public HttpResult<List<MeasurementCountDTO>> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) {
|
||||
public HttpResult<List<MeasurementCountDTO.MeasurementCountResultDTO>> getMeasurementCount(@RequestBody LineCountEvaluateParam lineCountEvaluateParam) {
|
||||
String methodDescribe = getMethodDescribe("getMeasurementCount");
|
||||
List<MeasurementCountDTO> data = dataVQuery.getMeasurementCount(lineIndex,startTime,endTime);
|
||||
List<MeasurementCountDTO.MeasurementCountResultDTO> data = dataVQuery.getMeasurementCount(lineCountEvaluateParam.getLineId(),lineCountEvaluateParam.getStartTime(),lineCountEvaluateParam.getEndTime());
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe);
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ public class PqsCommunicateController extends BaseController {
|
||||
@ApiOperation("插入数据")
|
||||
public HttpResult<String> insertion(@RequestBody PqsCommunicateDto pqsCommunicateDto) {
|
||||
String methodDescribe = getMethodDescribe("insertion");
|
||||
pqsCommunicateInsert.insertion(pqsCommunicateDto);
|
||||
pqsCommunicateCvtQuery.insertion(pqsCommunicateDto);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ public interface IDataV extends IMppService<RStatDataVD> {
|
||||
void batchInsertionCvtDTO(List<DataVCvtDto> cvtDTOList);
|
||||
|
||||
|
||||
List<MeasurementCountDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime);
|
||||
List<MeasurementCountDTO.MeasurementCountResultDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime);
|
||||
|
||||
/**
|
||||
* 获取原始数据
|
||||
|
||||
@@ -353,8 +353,9 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MeasurementCountDTO> getMeasurementCount(List<String> lineIndex, String startTime, String
|
||||
public List<MeasurementCountDTO.MeasurementCountResultDTO> getMeasurementCount(List<String> lineIndex, String startTime, String
|
||||
endTime) {
|
||||
List<MeasurementCountDTO.MeasurementCountResultDTO> result = new ArrayList<>();
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class, MeasurementCountDTO.class);
|
||||
influxQueryWrapper.regular(DataV::getLineId, lineIndex)
|
||||
.eq(DataV::getValueType, InfluxDbSqlConstant.MAX)
|
||||
@@ -362,7 +363,16 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
|
||||
.count(DataV::getFreq)
|
||||
.groupBy(DataV::getLineId)
|
||||
.between(DataV::getTime, startTime, endTime);
|
||||
return dataVMapper.getMeasurementCount(influxQueryWrapper);
|
||||
List<MeasurementCountDTO> measurementCountDTOList = dataVMapper.getMeasurementCount(influxQueryWrapper);
|
||||
if(CollUtil.isNotEmpty(measurementCountDTOList)){
|
||||
for(MeasurementCountDTO m : measurementCountDTOList){
|
||||
MeasurementCountDTO.MeasurementCountResultDTO p = new MeasurementCountDTO.MeasurementCountResultDTO();
|
||||
p.setFreq(m.getFreq());
|
||||
p.setLineId(m.getLineId());
|
||||
result.add(p);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -12,6 +12,7 @@ import com.njcn.device.pq.api.DeviceFeignClient;
|
||||
import com.njcn.device.pq.pojo.dto.DevComFlagDTO;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -35,6 +36,7 @@ import java.util.Objects;
|
||||
*/
|
||||
@Service("InfluxdbPqsCommunicateImpl")
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
||||
|
||||
@Resource
|
||||
@@ -108,6 +110,7 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
||||
|
||||
@Override
|
||||
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
|
||||
log.info("进出Influxdb实现类");
|
||||
//获取最新一条数据
|
||||
PqsCommunicate dto = new PqsCommunicate();
|
||||
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.njcn.dataProcess.service.impl.relation;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.dataProcess.dao.relation.mapper.PqReasonableRangeMapper;
|
||||
@@ -29,10 +30,14 @@ public class PqReasonableRangeServiceImpl extends ServiceImpl<PqReasonableRangeM
|
||||
public List<PqReasonableRangeDto> getReasonableRangeList(DataCleanParam param) {
|
||||
List<PqReasonableRangeDto> result = new ArrayList<>();
|
||||
LambdaQueryWrapper<PqReasonableRange> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(PqReasonableRange::getBelongingSystem,param.getSystemType())
|
||||
.eq(PqReasonableRange::getDataSource,param.getDataSource())
|
||||
.eq(PqReasonableRange::getInfluxdbTableName,param.getTableName())
|
||||
.eq(PqReasonableRange::getState,1);
|
||||
queryWrapper.eq(PqReasonableRange::getBelongingSystem,param.getSystemType());
|
||||
if(StrUtil.isNotBlank(param.getDataSource())){
|
||||
queryWrapper.eq(PqReasonableRange::getDataSource,param.getDataSource());
|
||||
}
|
||||
if(StrUtil.isNotBlank(param.getTableName())){
|
||||
queryWrapper.eq(PqReasonableRange::getInfluxdbTableName,param.getTableName());
|
||||
}
|
||||
queryWrapper.eq(PqReasonableRange::getState,1);
|
||||
List<PqReasonableRange> list = this.list(queryWrapper);
|
||||
if (CollUtil.isNotEmpty(list)) {
|
||||
list.forEach(item->{
|
||||
|
||||
@@ -134,7 +134,7 @@ public class RelationDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MeasurementCountDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) {
|
||||
public List<MeasurementCountDTO.MeasurementCountResultDTO> getMeasurementCount(List<String> lineIndex, String startTime, String endTime) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.njcn.dataProcess.param.LineCountEvaluateParam;
|
||||
import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto;
|
||||
import com.njcn.dataProcess.service.IPqsCommunicate;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Collections;
|
||||
@@ -16,6 +17,7 @@ import java.util.List;
|
||||
*/
|
||||
@Service("RelationPqsCommunicateImpl")
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class RelationPqsCommunicateImpl implements IPqsCommunicate {
|
||||
|
||||
|
||||
@@ -36,6 +38,6 @@ public class RelationPqsCommunicateImpl implements IPqsCommunicate {
|
||||
|
||||
@Override
|
||||
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
|
||||
|
||||
log.info("进出Relation实现类");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user