From d04900f29947ed64e52114b28255089eca56891a Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Mon, 3 Mar 2025 09:56:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=86=B2=E7=AA=81=E8=A1=A5?= =?UTF-8?q?=E5=85=851?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/DataHarmRateVController.java | 66 ++++++++++ .../dataProcess/service/IDataHarmRateV.java | 23 ++++ .../influxdb/InfluxdbDataHarmRateVImpl.java | 124 ++++++++++++++++++ .../relation/RelationDataHarmRateVImpl.java | 65 +++++++++ 4 files changed, 278 insertions(+) create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java create mode 100644 data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java new file mode 100644 index 0000000..d9f21be --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataHarmRateVController.java @@ -0,0 +1,66 @@ +package com.njcn.dataProcess.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.constant.OperateType; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.dataProcess.annotation.InsertBean; +import com.njcn.dataProcess.annotation.QueryBean; +import com.njcn.dataProcess.dto.DataHarmrateVDTO; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.DataHarmDto; +import com.njcn.dataProcess.service.IDataHarmRateV; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Controller; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * @author hongawen + * @version 1.0 + * @data 2024/11/6 19:48 + */ +@Validated +@Slf4j +@Controller +@RestController +@RequestMapping("/dataHarmRateV") +@Api(tags = "谐波电压含有率") +public class DataHarmRateVController extends BaseController { + + @QueryBean + private IDataHarmRateV dataHarmRateVQuery; + + @InsertBean + private IDataHarmRateV dataHarmRateVInsert; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.QUERY) + @PostMapping("/getRawData") + @ApiOperation("获取原始数据") + public HttpResult> getRawData(@RequestBody LineCountEvaluateParam lineParam) { + String methodDescribe = getMethodDescribe("getRawData"); + List data = dataHarmRateVQuery.getRawData(lineParam); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, data, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON,operateType = OperateType.ADD) + @PostMapping("/batchInsertion") + @ApiOperation("批量插入") + public HttpResult batchInsertion(@RequestBody List dataHarmrateVDTOList) { + String methodDescribe = getMethodDescribe("batchInsertion"); + + dataHarmRateVInsert.batchInsertion(dataHarmrateVDTOList); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java new file mode 100644 index 0000000..541cf47 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IDataHarmRateV.java @@ -0,0 +1,23 @@ +package com.njcn.dataProcess.service; + +import com.njcn.dataProcess.dto.DataHarmrateVDTO; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.pojo.dto.DataHarmDto; + +import java.util.List; + +/** + * @author xy + */ +public interface +IDataHarmRateV { + + /** + * 获取原始数据 + * @param lineParam + * @return + */ + List getRawData(LineCountEvaluateParam lineParam); + + void batchInsertion (List dataHarmrateVDTOList); +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java new file mode 100644 index 0000000..b97ff46 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbDataHarmRateVImpl.java @@ -0,0 +1,124 @@ +package com.njcn.dataProcess.service.impl.influxdb; + +import cn.hutool.core.collection.CollectionUtil; +import com.njcn.common.utils.HarmonicTimesUtil; +import com.njcn.dataProcess.dao.imapper.DataHarmRateVMapper; +import com.njcn.dataProcess.dto.DataHarmrateVDTO; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataHarmrateV; +import com.njcn.dataProcess.pojo.dto.DataHarmDto; +import com.njcn.dataProcess.service.IDataHarmRateV; +import com.njcn.dataProcess.util.BeanFeildUtils; +import com.njcn.influx.constant.InfluxDbSqlConstant; +import com.njcn.influx.query.InfluxQueryWrapper; +import lombok.RequiredArgsConstructor; +import org.apache.commons.collections4.ListUtils; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; + +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @author xy + */ +@Service("InfluxdbDataHarmRateVImpl") +@RequiredArgsConstructor +public class InfluxdbDataHarmRateVImpl implements IDataHarmRateV { + + private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + + private final DataHarmRateVMapper dataHarmRateVMapper; + + @Override + public List getRawData(LineCountEvaluateParam lineParam) { + List result = new ArrayList<>(); + List list = getMinuteDataI(lineParam.getLineId(), lineParam.getStartTime(), lineParam.getEndTime(), lineParam.getAbnormalTime()); + list.forEach(item->{ + DataHarmDto dto = new DataHarmDto(); + BeanUtils.copyProperties(item,dto); + dto.setMinTime(DATE_TIME_FORMATTER.format(item.getTime())); + result.add(dto); + }); + return result; + } + + /** + * 按监测点集合、时间条件获取dataI分钟数据 + * timeMap参数来判断是否进行数据处理 timeMap为空则不进行数据处理 + * 需要进行剔除异常数据时,这里会有三种情况判断 + * 1.无异常数据,则直接返回集合; + * 2.异常数据和无异常数据参杂,剔除异常数据,只计算正常数据; + * 3.全是异常数据,则使用异常数据进行计算,但是日表中需要标记出来,此数据有异常 + */ + public List getMinuteDataI(List lineList, String startTime, String endTime, Map> timeMap) { + List result = new ArrayList<>(); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataHarmrateV.class); + influxQueryWrapper.samePrefixAndSuffix(InfluxDbSqlConstant.V, "", HarmonicTimesUtil.harmonicTimesList(2, 50, 1)); + influxQueryWrapper.regular(DataHarmrateV::getLineId, lineList) + .select(DataHarmrateV::getLineId) + .select(DataHarmrateV::getPhasicType) + .select(DataHarmrateV::getValueType) + .select(DataHarmrateV::getQualityFlag) + .between(DataHarmrateV::getTime, startTime, endTime) + .eq(DataHarmrateV::getQualityFlag,"0"); + List list = dataHarmRateVMapper.selectByQueryWrapper(influxQueryWrapper); + Map> lineMap = list.stream().collect(Collectors.groupingBy(DataHarmrateV::getLineId)); + //有异常数据 + if (CollectionUtil.isNotEmpty(timeMap)) { + lineMap.forEach((k,v)->{ + List timeList = timeMap.get(k); + //有异常数据,当前监测点自身的异常数据 + if (CollectionUtil.isNotEmpty(timeList)) { + List filterList = v.stream().filter(item -> !timeList.contains(DATE_TIME_FORMATTER.format(item.getTime()))).collect(Collectors.toList()); + //1.过滤掉异常数据后还有正常数据,则用正常数据计算 + if (CollectionUtil.isNotEmpty(filterList)) { + result.addAll(filterList); + } + //2.过滤掉异常数据后没有正常数据,则用所有异常数据计算,但是需要标记数据为异常的 + else { + v.parallelStream().forEach(item -> item.setQualityFlag("1")); + result.addAll(v); + } + } + //没有异常数据,则使用原数据 + else { + result.addAll(v); + } + }); + } + //没有异常数据,则使用原数据 + else { + result.addAll(list); + } + return result; + } + + + + + + @Override + public void batchInsertion(List dataHarmrateVDTOList) { + int totalCount = dataHarmrateVDTOList.size(); + if(totalCount<=0){ + return; + } + + List collect = dataHarmrateVDTOList.stream().flatMap(temp -> DataHarmrateV.relationToInfluxDB(temp).stream()).collect(Collectors.toList()); + int minSize = Math.min(1200000, collect.size()); + + List> partition = ListUtils.partition(collect, minSize); + for (List dataHarmrateVList : partition) { + List sublistAsOriginalListType = new ArrayList<>(dataHarmrateVList); + + dataHarmRateVMapper.insertBatch(sublistAsOriginalListType); + + } + } + +} diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java new file mode 100644 index 0000000..3933454 --- /dev/null +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationDataHarmRateVImpl.java @@ -0,0 +1,65 @@ +package com.njcn.dataProcess.service.impl.relation; + +import com.njcn.dataProcess.dao.relation.mapper.DataHarmrateVRelationMapper; +import com.njcn.dataProcess.dao.relation.mapper.DataIRelationMapper; +import com.njcn.dataProcess.dto.DataHarmrateVDTO; +import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.relation.DataHarmrateV; +import com.njcn.dataProcess.pojo.dto.DataHarmDto; +import com.njcn.dataProcess.service.IDataHarmRateV; +import com.njcn.dataProcess.util.BeanFeildUtils; +import lombok.RequiredArgsConstructor; +import org.apache.commons.collections4.ListUtils; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author xy + */ +@Service("RelationDataHarmRateVImpl") +@RequiredArgsConstructor +public class RelationDataHarmRateVImpl implements IDataHarmRateV { + + @Resource + private DataIRelationMapper dataIRelationMapper; + private final DataHarmrateVRelationMapper dataHarmrateVRelationMapper; + + @Override + public List getRawData(LineCountEvaluateParam lineParam) { + return Collections.emptyList(); + } + + + + @Override + public void batchInsertion(List dataHarmrateVDTOList) { + int totalCount = dataHarmrateVDTOList.size(); + int minSize = Math.min(120, totalCount); + + if(totalCount<=0){ + return; + } + List collect = dataHarmrateVDTOList.stream().map(temp -> { + DataHarmrateV dataHarmrateV = new DataHarmrateV(); + BeanUtils.copyProperties(temp, dataHarmrateV, BeanFeildUtils.getNullPropertyNames(temp)); + return dataHarmrateV; + }).collect(Collectors.toList()); + collect = collect.stream().collect(Collectors.toMap( + temp -> temp.getTimeid() + temp.getLineid() + temp.getPhasicType(), + temp -> temp, + (exist, replace) -> exist + )).values().stream().collect(Collectors.toList()); + + List> partition = ListUtils.partition(collect, minSize); + for (List dataHarmphasicVList : partition) { + dataHarmrateVRelationMapper.insertBatchSomeColumn(dataHarmphasicVList); + + } + } +}