dataV原始数据数据清洗

This commit is contained in:
xy
2025-02-12 15:21:37 +08:00
parent 1f9c7e0b95
commit 935df88f3f
14 changed files with 246 additions and 76 deletions

View File

@@ -1,6 +1,7 @@
package com.njcn.algorithm.executor; package com.njcn.algorithm.executor;
import com.njcn.algorithm.service.line.DayDataService; import com.njcn.algorithm.service.line.IDataCleanService;
import com.njcn.algorithm.service.line.IDayDataService;
import com.yomahub.liteflow.annotation.LiteflowComponent; import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod; import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.core.NodeComponent;
@@ -23,12 +24,26 @@ import javax.annotation.Resource;
public class MeasurementExecutor extends BaseExecutor { public class MeasurementExecutor extends BaseExecutor {
@Resource @Resource
private DayDataService dayDataService; private IDayDataService dayDataService;
@Resource
private IDataCleanService dataCleanService;
/********************************************算法负责人:xy***********************************************************/
/** /**
* 算法名: 3.4.1.1-----监测点报表_日表(r_stat_data_*_d) * 数据清洗dataV
* @author xuyang * @author xy
*/
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataVClean", nodeType = NodeTypeEnum.COMMON)
public boolean dataVCleanAccess(NodeComponent bindCmp) {
return isAccess(bindCmp);
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataVClean", nodeType = NodeTypeEnum.COMMON)
public void dataVCleanProcess(NodeComponent bindCmp) {
dataCleanService.dataVCleanHandler(bindCmp.getRequestData());
}
/**
* 监测点报表_日表(r_stat_data_*_d)
* @author xy
*/ */
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataV", nodeType = NodeTypeEnum.COMMON) @LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataV", nodeType = NodeTypeEnum.COMMON)
public boolean dataVToDayAccess(NodeComponent bindCmp) { public boolean dataVToDayAccess(NodeComponent bindCmp) {
@@ -39,7 +54,4 @@ public class MeasurementExecutor extends BaseExecutor {
dayDataService.dataVHandler(bindCmp.getRequestData()); dayDataService.dataVHandler(bindCmp.getRequestData());
} }
/********************************************算法负责人:xy结束***********************************************************/
} }

View File

@@ -0,0 +1,17 @@
package com.njcn.algorithm.service.line;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
/**
* @author xy
*/
public interface IDataCleanService {
/***
* dataV数据清洗
* @author xy
* @param calculatedParam 查询条件
*/
void dataVCleanHandler(CalculatedParam calculatedParam);
}

View File

@@ -5,7 +5,7 @@ import com.njcn.algorithm.pojo.bo.CalculatedParam;
/** /**
* @author xy * @author xy
*/ */
public interface DayDataService { public interface IDayDataService {
/*** /***

View File

@@ -0,0 +1,56 @@
package com.njcn.algorithm.serviceimpl.line;
import cn.hutool.core.collection.CollUtil;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.IDataCleanService;
import com.njcn.dataProcess.api.DataVFeignClient;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.DataVDto;
import com.njcn.dataProcess.util.TimeUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* @author xy
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DataCleanServiceImpl implements IDataCleanService {
private static final Logger logger = LoggerFactory.getLogger(DataCleanServiceImpl.class);
private final static Integer NUM = 100;
@Resource
private DataVFeignClient dataVFeignClient;
@Override
public void dataVCleanHandler(CalculatedParam calculatedParam) {
List<DataVDto> result = new ArrayList<>();
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
List<List<String>> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM);
pendingIds.forEach(list->{
lineParam.setLineId(list);
List<DataVDto> partList = dataVFeignClient.getRawData(lineParam).getData();
if (CollUtil.isNotEmpty(partList)) {
partList.forEach(item->{
item.setAbnormalFlag(0);
});
result.addAll(partList);
}
});
if (CollUtil.isNotEmpty(result)) {
dataVFeignClient.addInfluxDbList(result);
}
}
}

View File

@@ -2,7 +2,7 @@ package com.njcn.algorithm.serviceimpl.line;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import com.njcn.algorithm.pojo.bo.CalculatedParam; import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.DayDataService; import com.njcn.algorithm.service.line.IDayDataService;
import com.njcn.dataProcess.api.DataVFeignClient; import com.njcn.dataProcess.api.DataVFeignClient;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
@@ -26,7 +26,7 @@ import java.util.*;
@Slf4j @Slf4j
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class DayDataServiceImpl implements DayDataService { public class DayDataServiceImpl implements IDayDataService {
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
private final static Integer NUM = 100; private final static Integer NUM = 100;
@@ -54,15 +54,14 @@ public class DayDataServiceImpl implements DayDataService {
//数据类型 //数据类型
List<CommonMinuteDto.ValueType> valueTypeList = item2.getValueTypeList(); List<CommonMinuteDto.ValueType> valueTypeList = item2.getValueTypeList();
//获取平均值集合 //获取平均值集合
CommonMinuteDto.ValueType valueTypes = valueTypeList.stream().filter(type-> type.getValueType().equalsIgnoreCase(InfluxDbSqlConstant.AVG_WEB)).findFirst().orElse(null); CommonMinuteDto.ValueType valueTypes = valueTypeList.stream().filter(type-> type.getValueType().equalsIgnoreCase(InfluxDbSqlConstant.AVG_WEB)).findFirst().orElse(null);
valueTypeList.forEach(item3->{ valueTypeList.forEach(item3->{
DataVDto dto = new DataVDto(); DataVDto dto = new DataVDto();
dto.setTime(item.getTime()); dto.setTime(item.getTime());
dto.setLineId(item.getLineId()); dto.setLineId(item.getLineId());
dto.setPhasicType(item2.getPhasicType()); dto.setPhasicType(item2.getPhasicType());
dto.setValueType(item3.getValueType()); dto.setValueType(item3.getValueType());
//todo 数据清洗的功能需要讨论在哪完成 dto.setQualityFlag("0");
dto.setQualityFlag(0);
channelDataVHandler(item3,valueTypes,dto,true); channelDataVHandler(item3,valueTypes,dto,true);
result.add(dto); result.add(dto);
}); });
@@ -87,7 +86,7 @@ public class DayDataServiceImpl implements DayDataService {
} else { } else {
valueType = pojo1; valueType = pojo1;
} }
//todo 按照集合排列顺序取值 //按照指标集合排列顺序取值
dto.setFreq(getData(valueType.getValueType(),valueType.getValueList().get(0),scheme)); dto.setFreq(getData(valueType.getValueType(),valueType.getValueList().get(0),scheme));
dto.setFreqDev(getData(valueType.getValueType(),valueType.getValueList().get(1),scheme)); dto.setFreqDev(getData(valueType.getValueType(),valueType.getValueList().get(1),scheme));
dto.setRms(getData(valueType.getValueType(),valueType.getValueList().get(2),scheme)); dto.setRms(getData(valueType.getValueType(),valueType.getValueList().get(2),scheme));

View File

@@ -5,6 +5,7 @@ import com.njcn.common.pojo.response.HttpResult;
import com.njcn.dataProcess.api.fallback.DataVFeignClientFallbackFactory; import com.njcn.dataProcess.api.fallback.DataVFeignClientFallbackFactory;
import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.influx.DataV;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.pojo.dto.DataVDto;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
@@ -25,13 +26,24 @@ public interface DataVFeignClient {
@PostMapping("/batchInsertion") @PostMapping("/batchInsertion")
HttpResult<String> batchInsertion(@RequestBody List<DataVDTO> dataVDTOList); HttpResult<String> batchInsertion(@RequestBody List<DataVDTO> dataVDTOList);
@PostMapping("/monitoringTime") @PostMapping("/monitoringTime")
HttpResult<List<LocalDateTime>> monitoringTime(@RequestParam("lineId") String lineId, @RequestParam("localData") String localData) ; HttpResult<List<LocalDateTime>> monitoringTime(@RequestParam("lineId") String lineId, @RequestParam("localData") String localData) ;
//获取原始数据
@PostMapping("/getRawData")
HttpResult<List<DataVDto>> getRawData(@RequestBody LineCountEvaluateParam lineParam);
//获取算法基础数据 //获取算法基础数据
@PostMapping("/getBaseData") @PostMapping("/getBaseData")
HttpResult<List<CommonMinuteDto>> getBaseData(@RequestBody LineCountEvaluateParam lineParam); HttpResult<List<CommonMinuteDto>> getBaseData(@RequestBody LineCountEvaluateParam lineParam);
//批量插入数据 //批量插入数据
@PostMapping("/addList") @PostMapping("/addList")
HttpResult<String> addList(@RequestBody List<DataVDto> dataVDtoList); HttpResult<String> addList(@RequestBody List<DataVDto> list);
//批量插入时序数据
@PostMapping("/addInfluxDbList")
HttpResult<String> addInfluxDbList(@RequestBody List<DataVDto> list);
} }

View File

@@ -6,10 +6,10 @@ import com.njcn.common.pojo.response.HttpResult;
import com.njcn.dataProcess.api.DataVFeignClient; import com.njcn.dataProcess.api.DataVFeignClient;
import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.influx.DataV;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.pojo.dto.DataVDto;
import com.njcn.dataProcess.util.DataProcessingEnumUtil; import com.njcn.dataProcess.util.DataProcessingEnumUtil;
import com.njcn.system.utils.SystemEnumUtil;
import feign.hystrix.FallbackFactory; import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@@ -54,6 +54,12 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory<DataVFei
throw new BusinessException(finalExceptionEnum); throw new BusinessException(finalExceptionEnum);
} }
@Override
public HttpResult<List<DataVDto>> getRawData(LineCountEvaluateParam lineParam) {
log.error("{}异常,降级处理,异常为:{}","获取原始数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override @Override
public HttpResult<List<CommonMinuteDto>> getBaseData(LineCountEvaluateParam lineParam) { public HttpResult<List<CommonMinuteDto>> getBaseData(LineCountEvaluateParam lineParam) {
log.error("{}异常,降级处理,异常为:{}","获取算法基础数据",cause.toString()); log.error("{}异常,降级处理,异常为:{}","获取算法基础数据",cause.toString());
@@ -61,11 +67,16 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory<DataVFei
} }
@Override @Override
public HttpResult<String> addList(List<DataVDto> dataVDtoList) { public HttpResult<String> addList(List<DataVDto> list) {
log.error("{}异常,降级处理,异常为:{}","批量插入数据",cause.toString()); log.error("{}异常,降级处理,异常为:{}","批量插入数据",cause.toString());
throw new BusinessException(finalExceptionEnum); throw new BusinessException(finalExceptionEnum);
} }
@Override
public HttpResult<String> addInfluxDbList(List<DataVDto> list) {
log.error("{}异常,降级处理,异常为:{}","时序数据库插入数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
}; };
} }
} }

View File

@@ -33,15 +33,28 @@ public class DataV {
@JsonSerialize(using = InstantDateSerializer.class) @JsonSerialize(using = InstantDateSerializer.class)
private Instant time; private Instant time;
@Column(name = "line_id", tag = true)
private String lineId;
@Column(name = "phasic_type", tag = true)
private String phasicType;
@Column(name = "value_type", tag = true)
private String valueType;
@Column(name = "quality_flag", tag = true)
private String qualityFlag;
//是否是异常指标数据0否1是
@Column(name = "abnormal_flag")
private Integer abnormalFlag;
@Column(name = "freq") @Column(name = "freq")
private Double freq; private Double freq;
@Column(name = "freq_dev") @Column(name = "freq_dev")
private Double freqDev; private Double freqDev;
@Column(name = "quality_flag", tag = true)
private String qualityFlag;
@Column(name = "rms") @Column(name = "rms")
private Double rms; private Double rms;
@@ -54,6 +67,21 @@ public class DataV {
@Column(name = "vu_dev") @Column(name = "vu_dev")
private Double vuDev; private Double vuDev;
@Column(name = "v_neg")
private Double vNeg;
@Column(name = "v_pos")
private Double vPos;
@Column(name = "v_thd")
private Double vThd;
@Column(name = "v_unbalance")
private Double vUnbalance;
@Column(name = "v_zero")
private Double vZero;
@Column(name = "v_1") @Column(name = "v_1")
private Double v1; private Double v1;
@@ -204,46 +232,6 @@ public class DataV {
@Column(name = "v_50") @Column(name = "v_50")
private Double v50; private Double v50;
@Column(name = "v_neg")
private Double vNeg;
@Column(name = "v_pos")
private Double vPos;
@Column(name = "v_thd")
private Double vThd;
@Column(name = "v_unbalance")
private Double vUnbalance;
@Column(name = "v_zero")
private Double vZero;
@Column(name = "line_id", tag = true)
private String lineId;
@Column(name = "phasic_type", tag = true)
private String phasicType;
@Column(name = "value_type", tag = true)
private String valueType;
//自定义字段-总计算次数
@Column(name = "all_time")
private Integer allTime;
//自定义字段
@Column(name = "mean")
private Double mean;
//自定义字段
@Column(name = "count")
private Integer count;
//是否是异常指标数据0否1是
@Column(name = "abnormal_flag")
private Integer abnormalFlag;
public static List<DataV> relationToInfluxDB(DataVDTO dataV) { public static List<DataV> relationToInfluxDB(DataVDTO dataV) {
if (dataV == null) { if (dataV == null) {
return null; return null;

View File

@@ -15,6 +15,9 @@ public class DataVDto implements Serializable {
@JsonFormat(pattern = "yyyy-MM-dd") @JsonFormat(pattern = "yyyy-MM-dd")
private String time; private String time;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private String minTime;
@ApiModelProperty("监测点Id") @ApiModelProperty("监测点Id")
private String lineId; private String lineId;
@@ -25,7 +28,10 @@ public class DataVDto implements Serializable {
private String valueType; private String valueType;
@ApiModelProperty("数据质量标志0-表示是正常数据、1-表示是错误数据、2-表示是有事件的数据数据库默认是0污染数据不参与报表统计") @ApiModelProperty("数据质量标志0-表示是正常数据、1-表示是错误数据、2-表示是有事件的数据数据库默认是0污染数据不参与报表统计")
private Integer qualityFlag; private String qualityFlag;
@ApiModelProperty("数据清洗标识 0:正常 1:异常")
private Integer abnormalFlag;
@ApiModelProperty("频率") @ApiModelProperty("频率")
private Double freq; private Double freq;

View File

@@ -6,10 +6,10 @@ import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil; import com.njcn.common.utils.HttpResultUtil;
import com.njcn.dataProcess.dto.DataVDTO;
import com.njcn.dataProcess.dto.DataVFiveItemDTO;
import com.njcn.dataProcess.annotation.InsertBean; import com.njcn.dataProcess.annotation.InsertBean;
import com.njcn.dataProcess.annotation.QueryBean; import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.dto.DataVDTO;
import com.njcn.dataProcess.dto.DataVFiveItemDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.pojo.dto.DataVDto;
@@ -75,6 +75,15 @@ public class DataVController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, localDateTimeList, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, localDateTimeList, methodDescribe);
} }
@OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.QUERY)
@PostMapping("/getRawData")
@ApiOperation("获取原始数据")
public HttpResult<List<DataVDto>> getRawData(@RequestBody LineCountEvaluateParam lineParam) {
String methodDescribe = getMethodDescribe("getRawData");
List<DataVDto> data = dataVQuery.getRawData(lineParam);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.QUERY) @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.QUERY)
@PostMapping("/getBaseData") @PostMapping("/getBaseData")
@ApiOperation("获取算法基础数据") @ApiOperation("获取算法基础数据")
@@ -86,12 +95,19 @@ public class DataVController extends BaseController {
@OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD)
@PostMapping("/addList") @PostMapping("/addList")
@ApiOperation("批量插入数据") @ApiOperation("关系型数据库插入数据")
public HttpResult<String> addList(@RequestBody List<DataVDto> dataVDtoList) { public HttpResult<String> addList(@RequestBody List<DataVDto> dataVDtoList) {
String methodDescribe = getMethodDescribe("addList"); String methodDescribe = getMethodDescribe("addList");
dataVInsert.addList(dataVDtoList); dataVInsert.addList(dataVDtoList);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe);
} }
@OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD)
@PostMapping("/addInfluxDbList")
@ApiOperation("时序数据库插入数据")
public HttpResult<String> addInfluxDbList(@RequestBody List<DataVDto> dataVList) {
String methodDescribe = getMethodDescribe("addInfluxDbList");
dataVInsert.addInfluxDbList(dataVList);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe);
}
} }

View File

@@ -4,6 +4,7 @@ import com.github.jeffreyning.mybatisplus.service.IMppService;
import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVDTO;
import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.influx.DataV;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataVDto; import com.njcn.dataProcess.pojo.dto.DataVDto;
import com.njcn.dataProcess.pojo.po.RStatDataVD; import com.njcn.dataProcess.pojo.po.RStatDataVD;
@@ -30,6 +31,13 @@ public interface IDataV extends IMppService<RStatDataVD> {
List<LocalDateTime> monitoringTime(String lineId, String localData); List<LocalDateTime> monitoringTime(String lineId, String localData);
/**
* 获取原始数据
* @param lineParam
* @return
*/
List<DataVDto> getRawData(LineCountEvaluateParam lineParam);
/** /**
* 获取监测点原始数据 * 获取监测点原始数据
* @param lineParam 监测点参数 * @param lineParam 监测点参数
@@ -40,4 +48,6 @@ public interface IDataV extends IMppService<RStatDataVD> {
* 批量插入数据 * 批量插入数据
*/ */
void addList(List<DataVDto> dataVDtoList); void addList(List<DataVDto> dataVDtoList);
void addInfluxDbList(List<DataVDto> dataVList);
} }

View File

@@ -2,17 +2,15 @@ package com.njcn.dataProcess.service.impl.influxdb;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil; import cn.hutool.core.collection.ListUtil;
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
import com.njcn.common.utils.HarmonicTimesUtil; import com.njcn.common.utils.HarmonicTimesUtil;
import com.njcn.dataProcess.constant.InfluxDBTableConstant; import com.njcn.dataProcess.constant.InfluxDBTableConstant;
import com.njcn.dataProcess.constant.PhaseType; import com.njcn.dataProcess.constant.PhaseType;
import com.njcn.dataProcess.dao.imapper.DataVMapper;
import com.njcn.dataProcess.dao.relation.mapper.RStatDataVRelationMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataVRelationMapper;
import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVDTO;
import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO;
import com.njcn.dataProcess.dto.LineDataVFiveItemDTO; import com.njcn.dataProcess.dto.LineDataVFiveItemDTO;
import com.njcn.dataProcess.dao.imapper.DataVMapper;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.po.influx.DataV;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
@@ -23,10 +21,13 @@ import com.njcn.dataProcess.util.TimeUtils;
import com.njcn.influx.constant.InfluxDbSqlConstant; import com.njcn.influx.constant.InfluxDbSqlConstant;
import com.njcn.influx.query.InfluxQueryWrapper; import com.njcn.influx.query.InfluxQueryWrapper;
import org.apache.commons.collections4.ListUtils; import org.apache.commons.collections4.ListUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -38,6 +39,8 @@ import java.util.stream.Collectors;
@Service("InfluxdbDataVImpl") @Service("InfluxdbDataVImpl")
public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper, RStatDataVD> implements IDataV { public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper, RStatDataVD> implements IDataV {
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());;
@Resource @Resource
private DataVMapper dataVMapper; private DataVMapper dataVMapper;
@@ -113,10 +116,23 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
return null; return null;
} }
@Override
public List<DataVDto> getRawData(LineCountEvaluateParam lineParam) {
List<DataVDto> result = new ArrayList<>();
List<DataV> list = getMinuteDataV(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), false);
list.forEach(item->{
DataVDto dto = new DataVDto();
BeanUtils.copyProperties(item,dto);
dto.setMinTime(DATE_TIME_FORMATTER.format(item.getTime()));
result.add(dto);
});
return result;
}
@Override @Override
public List<CommonMinuteDto> getBaseData(LineCountEvaluateParam lineParam) { public List<CommonMinuteDto> getBaseData(LineCountEvaluateParam lineParam) {
List<CommonMinuteDto> result = new ArrayList<>(); List<CommonMinuteDto> result = new ArrayList<>();
List<DataV> dataVList = getMinuteDataV(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime()); List<DataV> dataVList = getMinuteDataV(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(),true);
if (CollectionUtil.isNotEmpty(dataVList)) { if (CollectionUtil.isNotEmpty(dataVList)) {
String time = TimeUtils.StringTimeToString(lineParam.getStartTime()); String time = TimeUtils.StringTimeToString(lineParam.getStartTime());
//以监测点分组 //以监测点分组
@@ -137,7 +153,7 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
valueTypeMap.forEach((valueType,valueTypeList)->{ valueTypeMap.forEach((valueType,valueTypeList)->{
CommonMinuteDto.ValueType value = new CommonMinuteDto.ValueType(); CommonMinuteDto.ValueType value = new CommonMinuteDto.ValueType();
value.setValueType(valueType); value.setValueType(valueType);
//todo 规定好集合指标参数 //规定好集合指标参数
List<Double> data1 = valueTypeList.stream().map(DataV::getFreq).collect(Collectors.toList()); List<Double> data1 = valueTypeList.stream().map(DataV::getFreq).collect(Collectors.toList());
List<Double> data2 = valueTypeList.stream().map(DataV::getFreqDev).collect(Collectors.toList()); List<Double> data2 = valueTypeList.stream().map(DataV::getFreqDev).collect(Collectors.toList());
List<Double> data3 = valueTypeList.stream().map(DataV::getRms).collect(Collectors.toList()); List<Double> data3 = valueTypeList.stream().map(DataV::getRms).collect(Collectors.toList());
@@ -226,10 +242,22 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
} }
@Override
public void addInfluxDbList(List<DataVDto> dataVList) {
List<DataV> result = new ArrayList<>();
dataVList.forEach(item->{
DataV dataV = new DataV();
BeanUtils.copyProperties(item, dataV);
dataV.setTime(LocalDateTime.parse(item.getMinTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant());
result.add(dataV);
});
dataVMapper.insertBatch(result);
}
/** /**
* 按监测点集合、时间条件获取dataV分钟数据 * 按监测点集合、时间条件获取dataV分钟数据
*/ */
public List<DataV> getMinuteDataV(List<String> lineList, String startTime, String endTime) { public List<DataV> getMinuteDataV(List<String> lineList, String startTime, String endTime, boolean clean) {
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class); InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class);
influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, "", HarmonicTimesUtil.harmonicTimesList(1, 50, 1)); influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, "", HarmonicTimesUtil.harmonicTimesList(1, 50, 1));
influxQueryWrapper.regular(DataV::getLineId, lineList) influxQueryWrapper.regular(DataV::getLineId, lineList)
@@ -247,8 +275,11 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
.select(DataV::getVZero) .select(DataV::getVZero)
.select(DataV::getVlDev) .select(DataV::getVlDev)
.select(DataV::getVuDev) .select(DataV::getVuDev)
.select(DataV::getQualityFlag)
.between(DataV::getTime, startTime, endTime); .between(DataV::getTime, startTime, endTime);
System.out.println(influxQueryWrapper.generateSql()); if (clean) {
influxQueryWrapper.eq(DataV::getAbnormalFlag,0);
}
return dataVMapper.selectByQueryWrapper(influxQueryWrapper); return dataVMapper.selectByQueryWrapper(influxQueryWrapper);
} }

View File

@@ -2,11 +2,10 @@ package com.njcn.dataProcess.service.impl.relation;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl; import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
import com.njcn.dataProcess.dao.relation.mapper.DataVRelationMapper;
import com.njcn.dataProcess.dao.relation.mapper.RStatDataVRelationMapper; import com.njcn.dataProcess.dao.relation.mapper.RStatDataVRelationMapper;
import com.njcn.dataProcess.dto.DataVDTO; import com.njcn.dataProcess.dto.DataVDTO;
import com.njcn.dataProcess.dto.DataVFiveItemDTO; import com.njcn.dataProcess.dto.DataVFiveItemDTO;
import com.njcn.dataProcess.dao.relation.mapper.DataVRelationMapper;
import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.relation.DataV; import com.njcn.dataProcess.po.relation.DataV;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto; import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
@@ -84,6 +83,11 @@ public class RelationDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
return result; return result;
} }
@Override
public List<DataVDto> getRawData(LineCountEvaluateParam lineParam) {
return Collections.emptyList();
}
@Override @Override
public List<CommonMinuteDto> getBaseData(LineCountEvaluateParam lineParam) { public List<CommonMinuteDto> getBaseData(LineCountEvaluateParam lineParam) {
return Collections.emptyList(); return Collections.emptyList();
@@ -99,6 +103,13 @@ public class RelationDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
BeanUtils.copyProperties(item, dataV); BeanUtils.copyProperties(item, dataV);
result.add(dataV); result.add(dataV);
}); });
iDataV.saveOrUpdateBatchByMultiId(result); iDataV.saveOrUpdateBatch(result);
} }
@Override
public void addInfluxDbList(List<DataVDto> dataVList) {
}
} }

View File

@@ -55,6 +55,7 @@ mqtt:
data: data:
source: source:
query: Influxdb query: Influxdb
# insert: Influxdb
insert: Relation insert: Relation
#mybatis配置信息 #mybatis配置信息
mybatis-plus: mybatis-plus: