diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java index 3e63a0f..b15c39d 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/executor/MeasurementExecutor.java @@ -1,6 +1,7 @@ package com.njcn.algorithm.executor; -import com.njcn.algorithm.service.line.DayDataService; +import com.njcn.algorithm.service.line.IDataCleanService; +import com.njcn.algorithm.service.line.IDayDataService; import com.yomahub.liteflow.annotation.LiteflowComponent; import com.yomahub.liteflow.annotation.LiteflowMethod; import com.yomahub.liteflow.core.NodeComponent; @@ -23,12 +24,26 @@ import javax.annotation.Resource; public class MeasurementExecutor extends BaseExecutor { @Resource - private DayDataService dayDataService; + private IDayDataService dayDataService; + @Resource + private IDataCleanService dataCleanService; - /********************************************算法负责人:xy***********************************************************/ /** - * 算法名: 3.4.1.1-----监测点报表_日表(r_stat_data_*_d) - * @author xuyang + * 数据清洗(dataV) + * @author xy + */ + @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataVClean", nodeType = NodeTypeEnum.COMMON) + public boolean dataVCleanAccess(NodeComponent bindCmp) { + return isAccess(bindCmp); + } + @LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataVClean", nodeType = NodeTypeEnum.COMMON) + public void dataVCleanProcess(NodeComponent bindCmp) { + dataCleanService.dataVCleanHandler(bindCmp.getRequestData()); + } + + /** + * 监测点报表_日表(r_stat_data_*_d) + * @author xy */ @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataV", nodeType = NodeTypeEnum.COMMON) public boolean dataVToDayAccess(NodeComponent bindCmp) { @@ -39,7 +54,4 @@ public class MeasurementExecutor extends BaseExecutor { dayDataService.dataVHandler(bindCmp.getRequestData()); } - - - /********************************************算法负责人:xy结束***********************************************************/ } diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataCleanService.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataCleanService.java new file mode 100644 index 0000000..a5facc4 --- /dev/null +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDataCleanService.java @@ -0,0 +1,17 @@ +package com.njcn.algorithm.service.line; + +import com.njcn.algorithm.pojo.bo.CalculatedParam; + +/** + * @author xy + */ +public interface IDataCleanService { + + /*** + * dataV数据清洗 + * @author xy + * @param calculatedParam 查询条件 + */ + void dataVCleanHandler(CalculatedParam calculatedParam); + +} diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/DayDataService.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDayDataService.java similarity index 90% rename from algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/DayDataService.java rename to algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDayDataService.java index 7a82c13..9484c73 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/DayDataService.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/service/line/IDayDataService.java @@ -5,7 +5,7 @@ import com.njcn.algorithm.pojo.bo.CalculatedParam; /** * @author xy */ -public interface DayDataService { +public interface IDayDataService { /*** diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataCleanServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataCleanServiceImpl.java new file mode 100644 index 0000000..0cf27d4 --- /dev/null +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DataCleanServiceImpl.java @@ -0,0 +1,56 @@ +package com.njcn.algorithm.serviceimpl.line; + +import cn.hutool.core.collection.CollUtil; +import com.njcn.algorithm.pojo.bo.CalculatedParam; +import com.njcn.algorithm.service.line.IDataCleanService; +import com.njcn.dataProcess.api.DataVFeignClient; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.DataVDto; +import com.njcn.dataProcess.util.TimeUtils; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.ListUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +/** + * @author xy + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DataCleanServiceImpl implements IDataCleanService { + + private static final Logger logger = LoggerFactory.getLogger(DataCleanServiceImpl.class); + private final static Integer NUM = 100; + + @Resource + private DataVFeignClient dataVFeignClient; + + @Override + public void dataVCleanHandler(CalculatedParam calculatedParam) { + List result = new ArrayList<>(); + LineCountEvaluateParam lineParam = new LineCountEvaluateParam(); + lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); + lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); + List> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM); + pendingIds.forEach(list->{ + lineParam.setLineId(list); + List partList = dataVFeignClient.getRawData(lineParam).getData(); + if (CollUtil.isNotEmpty(partList)) { + partList.forEach(item->{ + item.setAbnormalFlag(0); + }); + result.addAll(partList); + } + }); + if (CollUtil.isNotEmpty(result)) { + dataVFeignClient.addInfluxDbList(result); + } + } +} \ No newline at end of file diff --git a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java index 77de24e..7e83d25 100644 --- a/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java +++ b/algorithm/algorithm-boot/src/main/java/com/njcn/algorithm/serviceimpl/line/DayDataServiceImpl.java @@ -2,7 +2,7 @@ package com.njcn.algorithm.serviceimpl.line; import cn.hutool.core.collection.CollUtil; import com.njcn.algorithm.pojo.bo.CalculatedParam; -import com.njcn.algorithm.service.line.DayDataService; +import com.njcn.algorithm.service.line.IDayDataService; import com.njcn.dataProcess.api.DataVFeignClient; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; @@ -26,7 +26,7 @@ import java.util.*; @Slf4j @Component @RequiredArgsConstructor -public class DayDataServiceImpl implements DayDataService { +public class DayDataServiceImpl implements IDayDataService { private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class); private final static Integer NUM = 100; @@ -54,15 +54,14 @@ public class DayDataServiceImpl implements DayDataService { //数据类型 List valueTypeList = item2.getValueTypeList(); //获取平均值集合 - CommonMinuteDto.ValueType valueTypes = valueTypeList.stream().filter(type-> type.getValueType().equalsIgnoreCase(InfluxDbSqlConstant.AVG_WEB)).findFirst().orElse(null); + CommonMinuteDto.ValueType valueTypes = valueTypeList.stream().filter(type-> type.getValueType().equalsIgnoreCase(InfluxDbSqlConstant.AVG_WEB)).findFirst().orElse(null); valueTypeList.forEach(item3->{ DataVDto dto = new DataVDto(); dto.setTime(item.getTime()); dto.setLineId(item.getLineId()); dto.setPhasicType(item2.getPhasicType()); dto.setValueType(item3.getValueType()); - //todo 数据清洗的功能需要讨论在哪完成 - dto.setQualityFlag(0); + dto.setQualityFlag("0"); channelDataVHandler(item3,valueTypes,dto,true); result.add(dto); }); @@ -87,7 +86,7 @@ public class DayDataServiceImpl implements DayDataService { } else { valueType = pojo1; } - //todo 按照集合排列顺序取值 + //按照指标集合排列顺序取值 dto.setFreq(getData(valueType.getValueType(),valueType.getValueList().get(0),scheme)); dto.setFreqDev(getData(valueType.getValueType(),valueType.getValueList().get(1),scheme)); dto.setRms(getData(valueType.getValueType(),valueType.getValueList().get(2),scheme)); diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java index cea7287..9b3feab 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/DataVFeignClient.java @@ -5,6 +5,7 @@ import com.njcn.common.pojo.response.HttpResult; import com.njcn.dataProcess.api.fallback.DataVFeignClientFallbackFactory; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVDto; import org.springframework.cloud.openfeign.FeignClient; @@ -25,13 +26,24 @@ public interface DataVFeignClient { @PostMapping("/batchInsertion") HttpResult batchInsertion(@RequestBody List dataVDTOList); + @PostMapping("/monitoringTime") HttpResult> monitoringTime(@RequestParam("lineId") String lineId, @RequestParam("localData") String localData) ; + + //获取原始数据 + @PostMapping("/getRawData") + HttpResult> getRawData(@RequestBody LineCountEvaluateParam lineParam); + //获取算法基础数据 @PostMapping("/getBaseData") HttpResult> getBaseData(@RequestBody LineCountEvaluateParam lineParam); + //批量插入数据 @PostMapping("/addList") - HttpResult addList(@RequestBody List dataVDtoList); + HttpResult addList(@RequestBody List list); + + //批量插入时序数据 + @PostMapping("/addInfluxDbList") + HttpResult addInfluxDbList(@RequestBody List list); } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java index 0e901c1..250304a 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/DataVFeignClientFallbackFactory.java @@ -6,10 +6,10 @@ import com.njcn.common.pojo.response.HttpResult; import com.njcn.dataProcess.api.DataVFeignClient; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.util.DataProcessingEnumUtil; -import com.njcn.system.utils.SystemEnumUtil; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -54,6 +54,12 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory> getRawData(LineCountEvaluateParam lineParam) { + log.error("{}异常,降级处理,异常为:{}","获取原始数据",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + @Override public HttpResult> getBaseData(LineCountEvaluateParam lineParam) { log.error("{}异常,降级处理,异常为:{}","获取算法基础数据",cause.toString()); @@ -61,11 +67,16 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory addList(List dataVDtoList) { + public HttpResult addList(List list) { log.error("{}异常,降级处理,异常为:{}","批量插入数据",cause.toString()); throw new BusinessException(finalExceptionEnum); } + @Override + public HttpResult addInfluxDbList(List list) { + log.error("{}异常,降级处理,异常为:{}","时序数据库插入数据",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java index e1e441c..0aa9dcf 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/po/influx/DataV.java @@ -33,15 +33,28 @@ public class DataV { @JsonSerialize(using = InstantDateSerializer.class) private Instant time; + @Column(name = "line_id", tag = true) + private String lineId; + + @Column(name = "phasic_type", tag = true) + private String phasicType; + + @Column(name = "value_type", tag = true) + private String valueType; + + @Column(name = "quality_flag", tag = true) + private String qualityFlag; + + //是否是异常指标数据,0否1是 + @Column(name = "abnormal_flag") + private Integer abnormalFlag; + @Column(name = "freq") private Double freq; @Column(name = "freq_dev") private Double freqDev; - @Column(name = "quality_flag", tag = true) - private String qualityFlag; - @Column(name = "rms") private Double rms; @@ -54,6 +67,21 @@ public class DataV { @Column(name = "vu_dev") private Double vuDev; + @Column(name = "v_neg") + private Double vNeg; + + @Column(name = "v_pos") + private Double vPos; + + @Column(name = "v_thd") + private Double vThd; + + @Column(name = "v_unbalance") + private Double vUnbalance; + + @Column(name = "v_zero") + private Double vZero; + @Column(name = "v_1") private Double v1; @@ -204,46 +232,6 @@ public class DataV { @Column(name = "v_50") private Double v50; - @Column(name = "v_neg") - private Double vNeg; - - @Column(name = "v_pos") - private Double vPos; - - @Column(name = "v_thd") - private Double vThd; - - @Column(name = "v_unbalance") - private Double vUnbalance; - - @Column(name = "v_zero") - private Double vZero; - - @Column(name = "line_id", tag = true) - private String lineId; - - @Column(name = "phasic_type", tag = true) - private String phasicType; - - @Column(name = "value_type", tag = true) - private String valueType; - - - //自定义字段-总计算次数 - @Column(name = "all_time") - private Integer allTime; - - //自定义字段 - @Column(name = "mean") - private Double mean; - - //自定义字段 - @Column(name = "count") - private Integer count; - //是否是异常指标数据,0否1是 - @Column(name = "abnormal_flag") - private Integer abnormalFlag; - public static List relationToInfluxDB(DataVDTO dataV) { if (dataV == null) { return null; diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/DataVDto.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/DataVDto.java index 83f8b12..584ca3e 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/DataVDto.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/pojo/dto/DataVDto.java @@ -15,6 +15,9 @@ public class DataVDto implements Serializable { @JsonFormat(pattern = "yyyy-MM-dd") private String time; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private String minTime; + @ApiModelProperty("监测点Id") private String lineId; @@ -25,7 +28,10 @@ public class DataVDto implements Serializable { private String valueType; @ApiModelProperty("数据质量标志(0-表示是正常数据、1-表示是错误数据、2-表示是有事件的数据)数据库默认是0,污染数据不参与报表统计") - private Integer qualityFlag; + private String qualityFlag; + + @ApiModelProperty("数据清洗标识 0:正常 1:异常") + private Integer abnormalFlag; @ApiModelProperty("频率") private Double freq; diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java index e34706b..0b955e6 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataVController.java @@ -6,10 +6,10 @@ import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; -import com.njcn.dataProcess.dto.DataVDTO; -import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.annotation.InsertBean; import com.njcn.dataProcess.annotation.QueryBean; +import com.njcn.dataProcess.dto.DataVDTO; +import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVDto; @@ -75,6 +75,15 @@ public class DataVController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, localDateTimeList, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.QUERY) + @PostMapping("/getRawData") + @ApiOperation("获取原始数据") + public HttpResult> getRawData(@RequestBody LineCountEvaluateParam lineParam) { + String methodDescribe = getMethodDescribe("getRawData"); + List data = dataVQuery.getRawData(lineParam); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); + } + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.QUERY) @PostMapping("/getBaseData") @ApiOperation("获取算法基础数据") @@ -86,12 +95,19 @@ public class DataVController extends BaseController { @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) @PostMapping("/addList") - @ApiOperation("批量插入数据") + @ApiOperation("关系型数据库插入数据") public HttpResult addList(@RequestBody List dataVDtoList) { String methodDescribe = getMethodDescribe("addList"); dataVInsert.addList(dataVDtoList); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe); } - + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) + @PostMapping("/addInfluxDbList") + @ApiOperation("时序数据库插入数据") + public HttpResult addInfluxDbList(@RequestBody List dataVList) { + String methodDescribe = getMethodDescribe("addInfluxDbList"); + dataVInsert.addInfluxDbList(dataVList); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe); + } } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java index d221373..577ca49 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataV.java @@ -4,6 +4,7 @@ import com.github.jeffreyning.mybatisplus.service.IMppService; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.pojo.po.RStatDataVD; @@ -30,6 +31,13 @@ public interface IDataV extends IMppService { List monitoringTime(String lineId, String localData); + /** + * 获取原始数据 + * @param lineParam + * @return + */ + List getRawData(LineCountEvaluateParam lineParam); + /** * 获取监测点原始数据 * @param lineParam 监测点参数 @@ -40,4 +48,6 @@ public interface IDataV extends IMppService { * 批量插入数据 */ void addList(List dataVDtoList); + + void addInfluxDbList(List dataVList); } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java index dc54f33..f0b5a44 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataVImpl.java @@ -2,17 +2,15 @@ package com.njcn.dataProcess.service.impl.influxdb; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; - import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.njcn.common.utils.HarmonicTimesUtil; import com.njcn.dataProcess.constant.InfluxDBTableConstant; import com.njcn.dataProcess.constant.PhaseType; +import com.njcn.dataProcess.dao.imapper.DataVMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataVRelationMapper; import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.dto.LineDataVFiveItemDTO; - -import com.njcn.dataProcess.dao.imapper.DataVMapper; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; @@ -23,10 +21,13 @@ import com.njcn.dataProcess.util.TimeUtils; import com.njcn.influx.constant.InfluxDbSqlConstant; import com.njcn.influx.query.InfluxQueryWrapper; import org.apache.commons.collections4.ListUtils; +import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -38,6 +39,8 @@ import java.util.stream.Collectors; @Service("InfluxdbDataVImpl") public class InfluxdbDataVImpl extends MppServiceImpl implements IDataV { + private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());; + @Resource private DataVMapper dataVMapper; @@ -113,10 +116,23 @@ public class InfluxdbDataVImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + List list = getMinuteDataV(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), false); + list.forEach(item->{ + DataVDto dto = new DataVDto(); + BeanUtils.copyProperties(item,dto); + dto.setMinTime(DATE_TIME_FORMATTER.format(item.getTime())); + result.add(dto); + }); + return result; + } + @Override public List getBaseData(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); - List dataVList = getMinuteDataV(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime()); + List dataVList = getMinuteDataV(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),true); if (CollectionUtil.isNotEmpty(dataVList)) { String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); //以监测点分组 @@ -137,7 +153,7 @@ public class InfluxdbDataVImpl extends MppServiceImpl{ CommonMinuteDto.ValueType value = new CommonMinuteDto.ValueType(); value.setValueType(valueType); - //todo 规定好集合指标参数 + //规定好集合指标参数 List data1 = valueTypeList.stream().map(DataV::getFreq).collect(Collectors.toList()); List data2 = valueTypeList.stream().map(DataV::getFreqDev).collect(Collectors.toList()); List data3 = valueTypeList.stream().map(DataV::getRms).collect(Collectors.toList()); @@ -226,10 +242,22 @@ public class InfluxdbDataVImpl extends MppServiceImpl dataVList) { + List result = new ArrayList<>(); + dataVList.forEach(item->{ + DataV dataV = new DataV(); + BeanUtils.copyProperties(item, dataV); + dataV.setTime(LocalDateTime.parse(item.getMinTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant()); + result.add(dataV); + }); + dataVMapper.insertBatch(result); + } + /** * 按监测点集合、时间条件获取dataV分钟数据 */ - public List getMinuteDataV(List lineList, String startTime, String endTime) { + public List getMinuteDataV(List lineList, String startTime, String endTime, boolean clean) { InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class); influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, "", HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); influxQueryWrapper.regular(DataV::getLineId, lineList) @@ -247,8 +275,11 @@ public class InfluxdbDataVImpl extends MppServiceImpl getRawData(LineCountEvaluateParam lineParam) { + return Collections.emptyList(); + } + @Override public List getBaseData(LineCountEvaluateParam lineParam) { return Collections.emptyList(); @@ -99,6 +103,13 @@ public class RelationDataVImpl extends MppServiceImpl dataVList) { + + } + + } diff --git a/data-processing/data-processing-boot/src/main/resources/bootstrap.yml b/data-processing/data-processing-boot/src/main/resources/bootstrap.yml index 66cce69..c32d6f1 100644 --- a/data-processing/data-processing-boot/src/main/resources/bootstrap.yml +++ b/data-processing/data-processing-boot/src/main/resources/bootstrap.yml @@ -55,6 +55,7 @@ mqtt: data: source: query: Influxdb +# insert: Influxdb insert: Relation #mybatis配置信息 mybatis-plus: