1.污染值算法迁移

2.增加暂态信息补招
This commit is contained in:
wr
2025-07-25 17:27:18 +08:00
parent 6d5d2a339e
commit 7a0b0cc943
28 changed files with 499 additions and 53 deletions

View File

@@ -37,7 +37,8 @@ public class MeasurementExecutor extends BaseExecutor {
private IDataComAssService dataComAssService;
@Resource
private IPollutionService pollutionService;
@Resource
private IPollutionCalc pollutionCalc;
/**
* 数据质量清洗
*/
@@ -450,6 +451,20 @@ public class MeasurementExecutor extends BaseExecutor {
pollutionService.handleDay(bindCmp.getRequestData());
}
/**
* 监测点污区图
* @param bindCmp
* @return
*/
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataPollutionCalc", nodeType = NodeTypeEnum.COMMON)
public boolean dataPollutionCalcAccess(NodeComponent bindCmp) {
return isAccess(bindCmp);
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataPollutionCalc", nodeType = NodeTypeEnum.COMMON)
public void dataPollutionCalcProcess(NodeComponent bindCmp) {
pollutionCalc.calcAllLineValue(bindCmp.getRequestData());
}
/**
* 监测点数据完整性

View File

@@ -0,0 +1,17 @@
package com.njcn.algorithm.service.line;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
/**
* 污染值算法
*/
public interface IPollutionCalc {
/**
* 计算谐波电压、谐波电流
* @param calculatedParam
* @Author: wr
* @Date: 2025/7/24 14:37
*/
void calcAllLineValue(CalculatedParam calculatedParam);
}

View File

@@ -1485,20 +1485,24 @@ public class DataCleanServiceImpl implements IDataCleanService {
pqReasonableRangeDto = map.get(DataCleanEnum.V_Data.getCode());
phaseList = Arrays.asList(pqReasonableRangeDto.getPhaseType().split(","));
if (phaseList.contains(dto.getPhasicType())) {
if (dto.getV1() < (pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))
|| dto.getV1() > (pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))) {
//log.info("dataV-基波电压数据异常,已清洗!数据值:{},数据时间:{}", dto.getV1(), dto.getMinTime());
PqDataVerify pqDataVerify = getPqDataVerify(dto.getLineId()
,dto.getMinTime()
,dto.getValueType()
,dto.getPhasicType()
,pqReasonableRangeDto.getIndexCode()
,pqReasonableRangeDto.getIndexName()
,pqReasonableRangeDto.getInfluxdbTableName()
,dto.getV1()
,pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel())
,pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()));
list.add(pqDataVerify);
if(ObjectUtil.isNotNull(dto.getV1())){
if (dto.getV1() < (pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))
|| dto.getV1() > (pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()))) {
//log.info("dataV-基波电压数据异常,已清洗!数据值:{},数据时间:{}", dto.getV1(), dto.getMinTime());
PqDataVerify pqDataVerify = getPqDataVerify(dto.getLineId()
,dto.getMinTime()
,dto.getValueType()
,dto.getPhasicType()
,pqReasonableRangeDto.getIndexCode()
,pqReasonableRangeDto.getIndexName()
,pqReasonableRangeDto.getInfluxdbTableName()
,dto.getV1()
,pqReasonableRangeDto.getMinValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel())
,pqReasonableRangeDto.getMaxValue()*DataCommonUtils.getVoltageData(line.getVoltageLevel()));
list.add(pqDataVerify);
}
}else{
logger.info("vData{}", dto);
}
}
return list;

View File

@@ -815,9 +815,15 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
//负序电流
if (!CollectionUtils.isEmpty(dataIPONegList)) {
for (DataIDto item : dataIPONegList) {
if (item.getINeg() > overlimit.getINeg()) {
if(ObjectUtil.isNotNull(item.getINeg())){
if (item.getINeg() > overlimit.getINeg()) {
addAbnormalData(iNeg, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getINeg(), overlimit.getINeg());
}
}else{
System.out.println(item);
addAbnormalData(iNeg, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getINeg(), overlimit.getINeg());
}
}
}
//频率偏差

View File

@@ -0,0 +1,244 @@
package com.njcn.algorithm.serviceimpl.line;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.IPollutionCalc;
import com.njcn.common.utils.PubUtils;
import com.njcn.dataProcess.api.DataHarmRateVFeignClient;
import com.njcn.dataProcess.api.DataIFeignClient;
import com.njcn.dataProcess.api.DataPollutionFeignClient;
import com.njcn.dataProcess.api.DataVFeignClient;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.DataHarmDto;
import com.njcn.dataProcess.pojo.dto.DataIDto;
import com.njcn.dataProcess.pojo.dto.DataPollutionD;
import com.njcn.dataProcess.pojo.dto.DataVDto;
import com.njcn.dataProcess.util.TimeUtils;
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
import com.njcn.device.biz.pojo.po.Overlimit;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.enums.DicDataTypeEnum;
import com.njcn.system.pojo.po.DictData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class PollutionCalcImpl implements IPollutionCalc {
@Resource
private CommTerminalGeneralClient commTerminalGeneralClient;
@Resource
private DicDataFeignClient dicDataFeignClient;
@Resource
private DataVFeignClient dataVFeignClient;
@Resource
private DataHarmRateVFeignClient dataHarmRateVFeignClient;
@Resource
private DataIFeignClient dataIFeignClient;
@Resource
private DataPollutionFeignClient dataPollutionFeignClient;
/**
* 监测点污染值计算,谐波电压&谐波电流
* 1、获取所有监测点
* 2、根据指标获取对应的污染值目前仅做谐波电压
* 注:目前仅支持获取昨天的
*/
@Override
public void calcAllLineValue(CalculatedParam calculatedParam) {
System.out.println("当前执行监测点污染值算法++++++++++++++++++++++++++++++++++");
List<DataPollutionD> list = new ArrayList<>();
List<String> idList = calculatedParam.getIdList();
List<Overlimit> overlimitList = commTerminalGeneralClient.getOverLimitDataByIds(idList).getData();
Map<String, Overlimit> limitMap = overlimitList.stream().collect(Collectors.toMap(Overlimit::getId, Function.identity()));
List<LineDevGetDTO> lineDetailList = commTerminalGeneralClient.getMonitorDetailList(idList).getData();
Map<String, LineDevGetDTO> lineDetailMap = lineDetailList.stream().collect(Collectors.toMap(LineDevGetDTO::getPointId, Function.identity()));
List<DictData> dictDataList = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.POLLUTION_CALC.getCode()).getData();
Map<String, String> dictData = dictDataList.stream().collect(Collectors.toMap(DictData::getCode, DictData::getId));
String vHarmonicLimit = dictData.get(DicDataEnum.V_HARMONIC_LIMIT.getCode());
String iAllLimit = dictData.get(DicDataEnum.I_ALL_LIMIT.getCode());
if (CollUtil.isNotEmpty(idList)) {
System.out.println("总共" + idList.size() + "个监测点参与污染值计算,开始执行");
DataPollutionD dataPollutionD;
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
for (String id : idList) {
if (lineDetailMap.containsKey(id)) {
LineDevGetDTO line = lineDetailMap.get(id);
if (limitMap.containsKey(id)) {
Overlimit overlimit = limitMap.get(id);
dataPollutionD = new DataPollutionD();
dataPollutionD.setLineId(id);
dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()));
dataPollutionD.setPollutionType(vHarmonicLimit);
lineParam.setValueType(Arrays.asList(line.getTimeInterval() + ""));
lineParam.setLineId(Arrays.asList(id));
List<DataVDto> dataVDtoList = dataVFeignClient.getGroupByTimeDataV(lineParam).getData();
List<DataHarmDto> dataHarmDtoList = dataHarmRateVFeignClient.getGroupByTimeHarmRateV(lineParam).getData();
if (CollUtil.isNotEmpty(dataVDtoList) && CollUtil.isNotEmpty(dataHarmDtoList)) {
//计算谐波电压污染值
dataPollutionD.setValue(PubUtils.doubleRound(2, calcVAllPollutionValue(dataVDtoList, dataHarmDtoList, overlimit) * line.getTimeInterval()));
list.add(dataPollutionD);
}
dataPollutionD = new DataPollutionD();
dataPollutionD.setLineId(id);
dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()));
dataPollutionD.setPollutionType(iAllLimit);
List<DataIDto> data = dataIFeignClient.getGroupByTimeDataI(lineParam).getData();
if (CollUtil.isNotEmpty(data)) {
//计算谐波电流污染值
dataPollutionD.setValue(PubUtils.doubleRound(2, calcIAllPollutionValue(data, overlimit) * line.getTimeInterval()));
list.add(dataPollutionD);
}
}
}
}
}
if (CollUtil.isNotEmpty(list)) {
dataPollutionFeignClient.batchInsertion(list);
}
}
/**
* 计算谐波电压的污染值包含了电压总畸变率和谐波电压2~25次
*
* @param dataVDtoList
* @param dataHarmDtoList
* @param overlimit
* @return
*/
private double calcVAllPollutionValue(List<DataVDto> dataVDtoList, List<DataHarmDto> dataHarmDtoList, Overlimit overlimit) {
// 计算时间范围内指标越限百分比
// 总畸变率的限值
List<Double> thdValueList = dataVDtoList.stream().map(DataVDto::getVThd).collect(Collectors.toList());
double thdPollutionValue = calcPollutionValue(overlimit.getUbalance(), thdValueList);
//谐波值2~50次
double harmRateVPollutionValue = calcHarmRateVPollutionValue(overlimit, dataHarmDtoList);
return thdPollutionValue + harmRateVPollutionValue;
}
private double calcIAllPollutionValue(List<DataIDto> dataVDtoList, Overlimit overlimit) {
List<Double> pollutionValue = new ArrayList<>();
// 2次
for (int i = 2; i < 51; i++) {
Float limitValue = getValueByFieldName(overlimit, "iharm" + i);
int finalI = i;
List<Double> valueList = dataVDtoList.stream().map(item -> {
Float valueByFieldName = getValueByFieldName(item, "i" + finalI);
if (Objects.isNull(valueByFieldName)) {
return 0.0;
} else {
return (double) valueByFieldName;
}
}).collect(Collectors.toList());
pollutionValue.add(calcPollutionValue(limitValue, valueList));
}
return pollutionValue.stream().mapToDouble(Double::doubleValue).sum();
}
/**
* 计算谐波电压2~50的越限污染值
*
* @param overlimit 限值
* @param dataHarmRateVList 2~50次的谐波电压值
* @return 谐波电压污染值
*/
private double calcHarmRateVPollutionValue(Overlimit overlimit, List<DataHarmDto> dataHarmRateVList) {
List<Double> pollutionValue = new ArrayList<>();
// 2次
for (int i = 2; i < 51; i++) {
Float limitValue = getValueByFieldName(overlimit, "uharm" + i);
int finalI = i;
List<Double> valueList = dataHarmRateVList.stream().map(item -> {
Float valueByFieldName = getValueByFieldName(item, "v" + finalI);
if (Objects.isNull(valueByFieldName)) {
return 0.0;
} else {
return (double) valueByFieldName;
}
}).collect(Collectors.toList());
pollutionValue.add(calcPollutionValue(limitValue, valueList));
}
return pollutionValue.stream().mapToDouble(Double::doubleValue).sum();
}
/**
* 获取当前组数据的污染值
*
* @param limit 限值
* @param valueList 参考数据
* @return 污染值
*/
private double calcPollutionValue(Float limit, List<Double> valueList) {
double pollutionValue = 0;
// 没有限值的直接返回
if (Objects.isNull(limit) || limit == 0.0f) {
return pollutionValue;
}
// 计算每个数值的越限百分比MOP
List<Double> overLimitPercentList = valueList.stream().map(value -> {
if (Objects.isNull(value)) {
return 0.0;
}
return value * 100 / limit;
}).collect(Collectors.toList());
// 计算每个越限百分比对应的污染值,并求和返回
if (CollUtil.isNotEmpty(overLimitPercentList)) {
pollutionValue = overLimitPercentList.stream().map(item -> {
double value = 0;
if (item >= 100.0 && item < 120.0) {
value = 0.1;
} else if (item >= 120.0 && item < 160.0) {
value = 0.2;
} else if (item >= 160.0 && item < 200.0) {
value = 0.3;
} else if (item >= 200.0) {
value = 0.4;
}
return value;
}).mapToDouble(Double::doubleValue).sum();
}
return pollutionValue;
}
/**
* 根据标记获取属性对应的值
*
* @return 值
*/
public static Float getValueByFieldName(Object obj, String fieldName) {
try {
Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
Object val = field.get(obj);
if (Objects.isNull(val)) {
return null;
}
if (val instanceof Double) {
Double doubleValue = (Double) val;
return doubleValue.floatValue();
}
return (float) field.get(obj);
} catch (NoSuchFieldException | IllegalAccessException e) {
return 0.0f;
}
}
}

View File

@@ -58,7 +58,7 @@ liteflow:
logging:
config: http://@nacos.url@/nacos/v1/cs/configs?tenant=@nacos.namespace@&group=DEFAULT_GROUP&dataId=logback.xml
level:
root: info
root:
#mybatis配置信息

View File

@@ -40,4 +40,7 @@ public interface DataHarmRateVFeignClient {
@PostMapping("/addInfluxDbList")
HttpResult<String> addInfluxDbList(@RequestBody List<DataHarmDto> list);
//按时间分组获取原始数据
@PostMapping("/getGroupByTimeHarmRateV")
HttpResult<List<DataHarmDto>> getGroupByTimeHarmRateV(@RequestBody LineCountEvaluateParam lineParam);
}

View File

@@ -1,18 +1,12 @@
package com.njcn.dataProcess.api;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.constant.OperateType;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.dataProcess.api.fallback.DataIFeignClientFallbackFactory;
import com.njcn.dataProcess.dto.DataIDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataIDto;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -44,4 +38,7 @@ public interface DataIFeignClient {
@PostMapping("/addInfluxDbList")
HttpResult<String> addInfluxDbList(@RequestBody List<DataIDto> dataIList);
@PostMapping("/getGroupByTimeDataI")
HttpResult<List<DataIDto>> getGroupByTimeDataI(@RequestBody LineCountEvaluateParam lineParam);
}

View File

@@ -63,4 +63,9 @@ public interface DataVFeignClient {
//获取原始数据
@PostMapping("/getDataV")
HttpResult<List<DataVDto>> getDataV(@RequestBody LineCountEvaluateParam lineParam);
//按时间分组获取原始数据
@PostMapping("/getGroupByTimeDataV")
HttpResult<List<DataVDto>> getGroupByTimeDataV(@RequestBody LineCountEvaluateParam lineParam);
}

View File

@@ -80,6 +80,12 @@ public class DataHarmRateVFeignClientFallbackFactory implements FallbackFactory<
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<List<DataHarmDto>> getGroupByTimeHarmRateV(LineCountEvaluateParam lineParam) {
log.error("{}异常,降级处理,异常为:{}","dataHarmRateV按时间分组获取原始数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -76,6 +76,12 @@ public class DataIFeignClientFallbackFactory implements FallbackFactory<DataIFei
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<List<DataIDto>> getGroupByTimeDataI(LineCountEvaluateParam lineParam) {
log.error("{}异常,降级处理,异常为:{}","dataI按时间分组获取原始数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -110,6 +110,12 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory<DataVFei
log.error("{}异常,降级处理,异常为:{}","查询数据DataV",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<List<DataVDto>> getGroupByTimeDataV(LineCountEvaluateParam lineParam) {
log.error("{}异常,降级处理,异常为:{}","DataV按时间分组获取原始数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -10,10 +10,7 @@ import com.njcn.dataProcess.annotation.InsertBean;
import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.dto.DataHarmrateVDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataHarmDto;
import com.njcn.dataProcess.pojo.dto.DataHarmRateIDto;
import com.njcn.dataProcess.pojo.dto.DataHarmRateVDto;
import com.njcn.dataProcess.pojo.dto.*;
import com.njcn.dataProcess.service.IDataHarmRateV;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
@@ -111,4 +108,12 @@ public class DataHarmRateVController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "", methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/getGroupByTimeHarmRateV")
@ApiOperation("按时间分组获取原始数据")
public HttpResult<List<DataHarmDto>> getGroupByTimeHarmRateV(@RequestBody LineCountEvaluateParam lineParam) {
String methodDescribe = getMethodDescribe("getGroupByTimeHarmRateV");
List<DataHarmDto> dataV = dataHarmRateVQuery.getGroupByTimeHarmRateV(lineParam);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataV, methodDescribe);
}
}

View File

@@ -11,6 +11,7 @@ import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.dto.DataIDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataHarmDto;
import com.njcn.dataProcess.pojo.dto.DataIDto;
import com.njcn.dataProcess.service.IDataI;
import com.njcn.web.controller.BaseController;
@@ -101,4 +102,12 @@ public class DataIController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataIDtoList, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/getGroupByTimeDataI")
@ApiOperation("按时间分组获取原始数据")
public HttpResult<List<DataIDto>> getGroupByTimeDataI(@RequestBody LineCountEvaluateParam lineParam) {
String methodDescribe = getMethodDescribe("getGroupByTimeDataI");
List<DataIDto> dataV = dataIQuery.getGroupByTimeDataI(lineParam);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataV, methodDescribe);
}
}

View File

@@ -63,7 +63,6 @@ public class DataVController extends BaseController {
@ApiOperation("批量插入")
public HttpResult<String> batchInsertion(@RequestBody List<DataVDTO> dataVDTOList) {
String methodDescribe = getMethodDescribe("batchInsertion");
dataVInsert.batchInsertion(dataVDTOList);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@@ -73,7 +72,6 @@ public class DataVController extends BaseController {
@ApiOperation("批量插入cvtDto")
public HttpResult<String> batchInsertionCvtDTO(@RequestBody List<DataVCvtDto> cvtDTOList) {
String methodDescribe = getMethodDescribe("batchInsertion");
dataVInsert.batchInsertionCvtDTO(cvtDTOList);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@@ -83,7 +81,6 @@ public class DataVController extends BaseController {
@ApiOperation("获取监测点数据时间点(补招使用)")
public HttpResult<List<LocalDateTime>> monitoringTime(@RequestParam("lineId") String lineId,@RequestParam("localData") String localData) {
String methodDescribe = getMethodDescribe("monitoringTime");
List<LocalDateTime> localDateTimeList = dataVQuery.monitoringTime(lineId,localData);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, localDateTimeList, methodDescribe);
}
@@ -160,6 +157,18 @@ public class DataVController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataV, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/getGroupByTimeDataV")
@ApiOperation("按时间分组获取原始数据")
public HttpResult<List<DataVDto>> getGroupByTimeDataV(@RequestBody LineCountEvaluateParam lineParam) {
String methodDescribe = getMethodDescribe("getGroupByTimeDataV");
List<DataVDto> dataV = dataVQuery.getGroupByTimeDataV(lineParam);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataV, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@GetMapping("/memoryTest")
@ApiOperation("n内存测试")

View File

@@ -42,4 +42,5 @@ IDataHarmRateV extends IMppService<RStatDataHarmRateVD> {
void addInfluxDbList(List<DataHarmDto> dataVList);
List<DataHarmDto> getGroupByTimeHarmRateV(LineCountEvaluateParam lineParam);
}

View File

@@ -49,4 +49,10 @@ public interface IDataI extends IMppService<RStatDataID> {
*/
List<DataIDto> getDataI(LineCountEvaluateParam lineParam);
/**
* 分组获取电流信息
* @param lineParam
* @return
*/
List<DataIDto> getGroupByTimeDataI(LineCountEvaluateParam lineParam);
}

View File

@@ -79,4 +79,6 @@ public interface IDataV extends IMppService<RStatDataVD> {
* @Date: 2025/3/20 13:42
*/
List<DataVDto> getDataV(LineCountEvaluateParam lineParam);
List<DataVDto> getGroupByTimeDataV(LineCountEvaluateParam lineParam);
}

View File

@@ -4,14 +4,17 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
import com.njcn.common.utils.HarmonicTimesUtil;
import com.njcn.dataProcess.constant.InfluxDBTableConstant;
import com.njcn.dataProcess.dao.imapper.DataHarmRateVMapper;
import com.njcn.dataProcess.dao.relation.mapper.RStatDataHarmRateVRelationMapper;
import com.njcn.dataProcess.dto.DataHarmrateVDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.influx.DataHarmrateV;
import com.njcn.dataProcess.po.influx.DataV;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataHarmDto;
import com.njcn.dataProcess.pojo.dto.DataHarmRateVDto;
import com.njcn.dataProcess.pojo.dto.DataVDto;
import com.njcn.dataProcess.pojo.po.RStatDataHarmRateVD;
import com.njcn.dataProcess.service.IDataHarmRateV;
import com.njcn.dataProcess.util.TimeUtils;
@@ -206,6 +209,25 @@ public class InfluxdbDataHarmRateVImpl extends MppServiceImpl<RStatDataHarmRateV
dataHarmRateVMapper.insertBatch(result);
}
@Override
public List<DataHarmDto> getGroupByTimeHarmRateV(LineCountEvaluateParam lineParam) {
InfluxQueryWrapper harmRateVQueryWrapper = new InfluxQueryWrapper(DataHarmrateV.class);
harmRateVQueryWrapper.maxSamePrefixAndSuffix(InfluxDbSqlConstant.V, "", HarmonicTimesUtil.harmonicTimesList(2, 50, 1));
harmRateVQueryWrapper.regular(DataV::getLineId, lineParam.getLineId())
.eq(DataHarmrateV::getValueType, InfluxDbSqlConstant.CP95)
.ne(DataHarmrateV::getPhasicType, InfluxDBTableConstant.PHASE_TYPE_T)
.groupBy("time(" + lineParam.getValueType().get(0) + "m)")
.between(DataHarmrateV::getTime, lineParam.getStartTime(), lineParam.getEndTime());
List<DataHarmrateV> list = dataHarmRateVMapper.selectByQueryWrapper(harmRateVQueryWrapper);
List<DataHarmDto> result = new ArrayList<>();
list.forEach(item -> {
DataHarmDto dto = new DataHarmDto();
BeanUtils.copyProperties(item, dto);
result.add(dto);
});
return result;
}
/**
* 按监测点集合、时间条件获取分钟数据
* timeMap参数来判断是否进行数据处理 timeMap为空则不进行数据处理

View File

@@ -4,13 +4,16 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import com.github.jeffreyning.mybatisplus.service.MppServiceImpl;
import com.njcn.common.utils.HarmonicTimesUtil;
import com.njcn.dataProcess.constant.InfluxDBTableConstant;
import com.njcn.dataProcess.dao.imapper.DataIMapper;
import com.njcn.dataProcess.dao.relation.mapper.RStatDataIRelationMapper;
import com.njcn.dataProcess.dto.DataIDTO;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.influx.DataHarmrateV;
import com.njcn.dataProcess.po.influx.DataI;
import com.njcn.dataProcess.po.influx.DataV;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataHarmDto;
import com.njcn.dataProcess.pojo.dto.DataIDto;
import com.njcn.dataProcess.pojo.po.RStatDataID;
import com.njcn.dataProcess.service.IDataI;
@@ -201,6 +204,25 @@ public class InfluxdbDataIImpl extends MppServiceImpl<RStatDataIRelationMapper,
return Collections.emptyList();
}
@Override
public List<DataIDto> getGroupByTimeDataI(LineCountEvaluateParam lineParam) {
InfluxQueryWrapper harmRateVQueryWrapper = new InfluxQueryWrapper(DataHarmrateV.class);
harmRateVQueryWrapper.maxSamePrefixAndSuffix(InfluxDbSqlConstant.I, "", HarmonicTimesUtil.harmonicTimesList(2, 50, 1));
harmRateVQueryWrapper.regular(DataV::getLineId, lineParam.getLineId())
.eq(DataHarmrateV::getValueType, InfluxDbSqlConstant.CP95)
.ne(DataHarmrateV::getPhasicType, InfluxDBTableConstant.PHASE_TYPE_T)
.groupBy("time(" + lineParam.getValueType().get(0) + "m)")
.between(DataHarmrateV::getTime, lineParam.getStartTime(), lineParam.getEndTime());
List<DataI> list = dataIMapper.selectByQueryWrapper(harmRateVQueryWrapper);
List<DataIDto> result = new ArrayList<>();
list.forEach(item -> {
DataIDto dto = new DataIDto();
BeanUtils.copyProperties(item, dto);
result.add(dto);
});
return result;
}
/**
* 按监测点集合、时间条件获取dataI分钟数据
* timeMap参数来判断是否进行数据处理 timeMap为空则不进行数据处理

View File

@@ -382,6 +382,25 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
return Collections.emptyList();
}
@Override
public List<DataVDto> getGroupByTimeDataV(LineCountEvaluateParam lineParam) {
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(DataV.class);
influxQueryWrapper.regular(DataV::getLineId, lineParam.getLineId())
.eq(DataV::getValueType, InfluxDbSqlConstant.CP95)
.ne(DataV::getPhasicType, InfluxDBTableConstant.PHASE_TYPE_T)
.max(DataV::getVThd)
.groupBy("time(" + lineParam.getValueType().get(0) + "m)")
.between(DataV::getTime, lineParam.getStartTime(), lineParam.getEndTime());
List<DataV> list = dataVMapper.selectByQueryWrapper(influxQueryWrapper);
List<DataVDto> result = new ArrayList<>();
list.forEach(item -> {
DataVDto dto = new DataVDto();
BeanUtils.copyProperties(item, dto);
result.add(dto);
});
return result;
}
/**
* 按监测点集合、时间条件获取dataV分钟数据
* timeMap参数来判断是否进行数据出来 timeMap为空则不进行数据处理

View File

@@ -128,6 +128,11 @@ public class RelationDataHarmRateVImpl extends MppServiceImpl<RStatDataHarmRateV
}
@Override
public List<DataHarmDto> getGroupByTimeHarmRateV(LineCountEvaluateParam lineParam) {
return Collections.emptyList();
}
private List<DataHarmDto> quality(List<DataHarmDto> list, LineCountEvaluateParam lineParam) {
List<DataHarmDto> result = new ArrayList<>();
Map<String, List<DataHarmDto>> lineMap = list.stream().collect(Collectors.groupingBy(DataHarmDto::getLineId));

View File

@@ -130,6 +130,11 @@ public class RelationDataIImpl extends MppServiceImpl<RStatDataIRelationMapper,
return quality(result, lineParam);
}
@Override
public List<DataIDto> getGroupByTimeDataI(LineCountEvaluateParam lineParam) {
return Collections.emptyList();
}
private List<DataIDto> quality(List<DataIDto> list, LineCountEvaluateParam lineParam) {
List<DataIDto> result = new ArrayList<>();
Map<String, List<DataIDto>> lineMap = list.stream().collect(Collectors.groupingBy(DataIDto::getLineId));

View File

@@ -162,6 +162,11 @@ public class RelationDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
return quality(info, lineParam);
}
@Override
public List<DataVDto> getGroupByTimeDataV(LineCountEvaluateParam lineParam) {
return Collections.emptyList();
}
private List<DataVDto> quality(List<DataVDto> list, LineCountEvaluateParam lineParam) {
List<DataVDto> result = new ArrayList<>();
Map<String, List<DataVDto>> lineMap = list.stream().collect(Collectors.groupingBy(DataVDto::getLineId));

View File

@@ -2,6 +2,7 @@ package com.njcn.migration.read.controller;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.njcn.api.MigrationInsertFeignClient;
import com.njcn.param.LineCountEvaluateParam;
import com.njcn.migration.read.service.MigrationService;
import io.swagger.annotations.*;
@@ -32,7 +33,7 @@ import java.time.temporal.ChronoUnit;
public class MigrationInfluxDBController {
private final MigrationService migrationService;
private final MigrationInsertFeignClient migrationInsertFeignClient;
@GetMapping("/influxdb")
@ApiOperation("influxdb数据同步->天数按小时进行分组同步")
@@ -68,29 +69,32 @@ public class MigrationInfluxDBController {
}
@GetMapping(value = "/importExcel")
@ApiOperation(value ="是否更新数据1是 0否表示用于定时任务")
public void importDistributionAreaExcel(@RequestParam("type") Integer type) {
public void importDistributionAreaExcel(@RequestParam("type") Integer type,@RequestParam("startDate") String startDateTime,@RequestParam("endDate") String endDateTime) {
if(type==0){
migrationService.initializeExcel();
}else{
System.out.println("--------------------------------手动1influxdb同步------------------------------------");
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
// 减去2个小时
LocalDateTime oneHourAgo = now.minusHours(2);
// 将分钟和秒设置为0
LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS);
// 加上59分钟59秒
LocalDateTime modifiedResult = result.plusMinutes(59).plusSeconds(59);
LineCountEvaluateParam param = new LineCountEvaluateParam();
param.setIsManual(false);
param.setStartTime(result.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
param.setEndTime(modifiedResult.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
migrationService.hourseLineDataBacthSysc(param);
migrationService.hourseDevDataBacthSysc(param);
try {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN);
LineCountEvaluateParam param=new LineCountEvaluateParam();
param.setIsManual(true);
LocalDateTime startDate = LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATETIME_PATTERN);
LocalDateTime endDate = LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATETIME_PATTERN);
long betweenDay = LocalDateTimeUtil.between(startDate, endDate, ChronoUnit.HOURS);
param.setStartTime(startDate.format(dateTimeFormatter));
param.setEndTime(startDate.with(LocalTime.of(startDate.getHour(), 59, 59)).format(dateTimeFormatter));
migrationService.hourseLineEventBacthSysc(param);
for (int i = 0; i <betweenDay; i++) {
LineCountEvaluateParam countEvaluateParam=new LineCountEvaluateParam();
countEvaluateParam.setIsManual(true);
startDate = LocalDateTimeUtil.offset(startDate, 1, ChronoUnit.HOURS);
countEvaluateParam.setStartTime(startDate.format(dateTimeFormatter));
countEvaluateParam.setEndTime(startDate.with(LocalTime.of(startDate.getHour(), 59, 59)).format(dateTimeFormatter));
migrationService.hourseLineEventBacthSysc(countEvaluateParam);
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
}
}

View File

@@ -46,19 +46,32 @@ public class MigrationInfluxDBJob {
System.out.println("--------------------------------influxdb同步------------------------------------");
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
// 减去2个小时
LocalDateTime oneHourAgo = now.minusHours(2);
// 将分钟和秒设置为0
LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS);
// 加上59分钟59秒
LocalDateTime modifiedResult = result.plusMinutes(59).plusSeconds(59);
LineCountEvaluateParam param = new LineCountEvaluateParam();
param.setIsManual(false);
param.setStartTime(result.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
param.setEndTime(modifiedResult.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
migrationService.hourseLineDataBacthSysc(param);
migrationService.hourseDevDataBacthSysc(param);
//定时任务在往前补一小时的
LocalDateTime oneHourAgoOld = now.minusHours(3);
// 将分钟和秒设置为0
LocalDateTime resultOld = oneHourAgoOld.truncatedTo(ChronoUnit.HOURS);
// 加上59分钟59秒
LocalDateTime modifiedResultOld = resultOld.plusMinutes(59).plusSeconds(59);
LineCountEvaluateParam paramOld = new LineCountEvaluateParam();
paramOld.setIsManual(false);
paramOld.setStartTime(resultOld.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
paramOld.setEndTime(modifiedResultOld.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
migrationService.hourseLineDataBacthSysc(paramOld);
migrationService.hourseDevDataBacthSysc(paramOld);
}
@Scheduled(cron = "0 0 22 * * ?")

View File

@@ -12,6 +12,8 @@ public interface MigrationService {
void hourseLineDataBacthSysc(LineCountEvaluateParam param);
void hourseLineEventBacthSysc(LineCountEvaluateParam param);
void hourseDevDataBacthSysc(LineCountEvaluateParam param);
void initializeExcel();

View File

@@ -94,13 +94,21 @@ public class MigrationServiceImpl implements MigrationService {
migrationInsertFeignClient.insertDataInharmI(dataInharmI.listDataInharmI(evaluateParam));
migrationInsertFeignClient.insertDataInharmV(dataInharmV.listDataInharmV(evaluateParam));
migrationInsertFeignClient.insertDataPlt(dataPlt.listDataPlt(evaluateParam));
// migrationInsertFeignClient.batchInsertion(eventDetail.getRawData(evaluateParam));
migrationInsertFeignClient.batchInsertion(eventDetail.getRawData(evaluateParam));
if (!param.getIsManual() && StrUtil.isNotBlank(format)) {
TimeUtil.putLineTime(lineId, format);
}
});
System.gc();
}
@Override
@Async("asyncInfluxDBExecutor")
public void hourseLineEventBacthSysc(LineCountEvaluateParam param) {
LineCountEvaluateParam evaluateParam = new LineCountEvaluateParam();
evaluateParam.setStartTime(param.getStartTime());
evaluateParam.setEndTime(param.getEndTime());
migrationInsertFeignClient.batchInsertion(eventDetail.getRawData(evaluateParam));
}
@Override