From 85cfbe0cb5fdc06b7e5cd764351440dcf9fb15cb Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Mon, 31 Mar 2025 15:19:35 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/dataProcess/controller/PqsCommunicateController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java index 56f9521..46f983a 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java @@ -77,7 +77,7 @@ public class PqsCommunicateController extends BaseController { @ApiOperation("插入数据") public HttpResult insertion(@RequestBody PqsCommunicateDto pqsCommunicateDto) { String methodDescribe = getMethodDescribe("insertion"); - pqsCommunicateInsert.insertion(pqsCommunicateDto); + pqsCommunicateCvtQuery.insertion(pqsCommunicateDto); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } From 21df04eb6dbb9e1cee875ebfca103c56324d0a78 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Tue, 1 Apr 2025 09:36:38 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/influxdb/InfluxdbPqsCommunicateImpl.java | 3 +++ .../service/impl/relation/RelationPqsCommunicateImpl.java | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java index 0f2967c..844fb23 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java @@ -12,6 +12,7 @@ import com.njcn.device.pq.api.DeviceFeignClient; import com.njcn.device.pq.pojo.dto.DevComFlagDTO; import com.njcn.influx.query.InfluxQueryWrapper; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -35,6 +36,7 @@ import java.util.Objects; */ @Service("InfluxdbPqsCommunicateImpl") @RequiredArgsConstructor +@Slf4j public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { @Resource @@ -108,6 +110,7 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { @Override public void insertion(PqsCommunicateDto pqsCommunicateDto) { + log.info("进出Influxdb实现类"); //获取最新一条数据 PqsCommunicate dto = new PqsCommunicate(); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class); diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java index 4a2b83c..1bac220 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java @@ -4,6 +4,7 @@ import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto; import com.njcn.dataProcess.service.IPqsCommunicate; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.Collections; @@ -16,6 +17,7 @@ import java.util.List; */ @Service("RelationPqsCommunicateImpl") @RequiredArgsConstructor +@Slf4j public class RelationPqsCommunicateImpl implements IPqsCommunicate { @@ -36,6 +38,6 @@ public class RelationPqsCommunicateImpl implements IPqsCommunicate { @Override public void insertion(PqsCommunicateDto pqsCommunicateDto) { - + log.info("进出Relation实现类"); } } From 42ce80478c13addce2ace980d44d2dd223471ae6 Mon Sep 17 00:00:00 2001 From: chendaofei <857448963@qq.com> Date: Tue, 1 Apr 2025 09:58:46 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=AE=8C=E6=95=B4?= =?UTF-8?q?=E6=80=A7=E7=AE=97=E6=B3=95=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../line/IDataIntegrityServiceImpl.java | 7 ++++++- .../njcn/dataProcess/api/DataVFeignClient.java | 2 +- .../fallback/DataVFeignClientFallbackFactory.java | 4 ++-- .../njcn/dataProcess/dto/MeasurementCountDTO.java | 15 +++++++++++++-- .../dataProcess/controller/DataVController.java | 4 ++-- .../java/com/njcn/dataProcess/service/IDataV.java | 2 +- .../service/impl/influxdb/InfluxdbDataVImpl.java | 14 ++++++++++++-- .../relation/PqReasonableRangeServiceImpl.java | 13 +++++++++---- .../service/impl/relation/RelationDataVImpl.java | 2 +- 9 files changed, 47 insertions(+), 16 deletions(-) diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java index 96c0a7d..e0b62ab 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/IDataIntegrityServiceImpl.java @@ -67,9 +67,14 @@ public class IDataIntegrityServiceImpl implements IDataIntegrityService { ); //以尺寸100分片 List> pendingIds = ListUtils.partition(lineIds, NUM); + + LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam(); + lineCountEvaluateParam.setStartTime(beginDay); + lineCountEvaluateParam.setEndTime(endDay); for (List pendingId : pendingIds) { + lineCountEvaluateParam.setLineId(pendingId); List lineDevGetDTOList = commTerminalGeneralClient.getMonitorDetailList(pendingId).getData(); - List countList = dataVFeignClient.getMeasurementCount(pendingId, beginDay, endDay).getData(); + List countList = dataVFeignClient.getMeasurementCount(lineCountEvaluateParam).getData(); poList.addAll( lineDevGetDTOList.stream() .map(item -> { diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java index 24b03ee..bcd469f 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java @@ -58,7 +58,7 @@ public interface DataVFeignClient { HttpResult batchInsertionCvtDTO(@RequestBody List cvtDTOList); @PostMapping("/getMeasurementCount") - HttpResult> getMeasurementCount(@RequestParam("lineIndex")List lineIndex, @RequestParam("startTime")String startTime, @RequestParam("endTime")String endTime); + HttpResult> getMeasurementCount(@RequestBody LineCountEvaluateParam lineCountEvaluateParam); //获取原始数据 @PostMapping("/getDataV") diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java index b7fe575..da01430 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java @@ -100,8 +100,8 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory> getMeasurementCount(List lineIndex,String startTime,String endTime){ - log.error("{}异常,降级处理,异常为:{}","cvt数据插入DataV",cause.toString()); + public HttpResult> getMeasurementCount(LineCountEvaluateParam lineCountEvaluateParam){ + log.error("{}异常,降级处理,异常为:{}","获取完整性数据",cause.toString()); throw new BusinessException(finalExceptionEnum); } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java index e19c31b..5293d3e 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/dto/MeasurementCountDTO.java @@ -17,13 +17,24 @@ import java.time.Instant; @Measurement(name = "data_v") public class MeasurementCountDTO { - @Column(name = "time") + @Column(name = "time",tag = true) @JsonSerialize(using = InstantDateSerializer.class) private Instant time; - @Column(name = "line_id") + @Column(name = "line_id",tag = true) private String lineId; @Column(name = "freq") private String freq; + + + @Data + public static class MeasurementCountResultDTO{ + + private String time; + + private String lineId; + + private String freq; + } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java index 0ef9ffb..fa3bf95 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java @@ -147,9 +147,9 @@ public class DataVController extends BaseController { @OperateInfo(info = LogEnum.BUSINESS_COMMON) @PostMapping("/getMeasurementCount") @ApiOperation("获取算法基础数据") - public HttpResult> getMeasurementCount(List lineIndex, String startTime, String endTime) { + public HttpResult> getMeasurementCount(@RequestBody LineCountEvaluateParam lineCountEvaluateParam) { String methodDescribe = getMethodDescribe("getMeasurementCount"); - List data = dataVQuery.getMeasurementCount(lineIndex,startTime,endTime); + List data = dataVQuery.getMeasurementCount(lineCountEvaluateParam.getLineId(),lineCountEvaluateParam.getStartTime(),lineCountEvaluateParam.getEndTime()); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java index 9b80efd..1c5d0c1 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java @@ -69,7 +69,7 @@ public interface IDataV extends IMppService { void batchInsertionCvtDTO(List cvtDTOList); - List getMeasurementCount(List lineIndex, String startTime, String endTime); + List getMeasurementCount(List lineIndex, String startTime, String endTime); /** * 获取原始数据 diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java index 5950f30..bf893d0 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java @@ -353,8 +353,9 @@ public class InfluxdbDataVImpl extends MppServiceImpl getMeasurementCount(List lineIndex, String startTime, String + public List getMeasurementCount(List lineIndex, String startTime, String endTime) { + List result = new ArrayList<>(); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class, MeasurementCountDTO.class); influxQueryWrapper.regular(DataV::getLineId, lineIndex) .eq(DataV::getValueType, InfluxDbSqlConstant.MAX) @@ -362,7 +363,16 @@ public class InfluxdbDataVImpl extends MppServiceImpl measurementCountDTOList = dataVMapper.getMeasurementCount(influxQueryWrapper); + if(CollUtil.isNotEmpty(measurementCountDTOList)){ + for(MeasurementCountDTO m : measurementCountDTOList){ + MeasurementCountDTO.MeasurementCountResultDTO p = new MeasurementCountDTO.MeasurementCountResultDTO(); + p.setFreq(m.getFreq()); + p.setLineId(m.getLineId()); + result.add(p); + } + } + return result; } @Override diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/PqReasonableRangeServiceImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/PqReasonableRangeServiceImpl.java index c07b277..2845ac9 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/PqReasonableRangeServiceImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/PqReasonableRangeServiceImpl.java @@ -1,6 +1,7 @@ package com.njcn.dataProcess.service.impl.relation; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.dataProcess.dao.relation.mapper.PqReasonableRangeMapper; @@ -29,10 +30,14 @@ public class PqReasonableRangeServiceImpl extends ServiceImpl getReasonableRangeList(DataCleanParam param) { List result = new ArrayList<>(); LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(PqReasonableRange::getBelongingSystem,param.getSystemType()) - .eq(PqReasonableRange::getDataSource,param.getDataSource()) - .eq(PqReasonableRange::getInfluxdbTableName,param.getTableName()) - .eq(PqReasonableRange::getState,1); + queryWrapper.eq(PqReasonableRange::getBelongingSystem,param.getSystemType()); + if(StrUtil.isNotBlank(param.getDataSource())){ + queryWrapper.eq(PqReasonableRange::getDataSource,param.getDataSource()); + } + if(StrUtil.isNotBlank(param.getTableName())){ + queryWrapper.eq(PqReasonableRange::getInfluxdbTableName,param.getTableName()); + } + queryWrapper.eq(PqReasonableRange::getState,1); List list = this.list(queryWrapper); if (CollUtil.isNotEmpty(list)) { list.forEach(item->{ diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java index 3281c9b..655e090 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataVImpl.java @@ -134,7 +134,7 @@ public class RelationDataVImpl extends MppServiceImpl getMeasurementCount(List lineIndex, String startTime, String endTime) { + public List getMeasurementCount(List lineIndex, String startTime, String endTime) { return Collections.emptyList(); }