污区图算法编写

This commit is contained in:
wr
2025-03-20 16:22:20 +08:00
parent 0f54dc5199
commit db5ff43ea7
52 changed files with 1844 additions and 1223 deletions

View File

@@ -310,53 +310,53 @@ public class ExecutionCenter extends BaseController {
//
// }
//
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("变电站算法执行链")
// @PostMapping("/substationExecutor")
// @Async("asyncExecutor")
// public void substationExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("substationExecutor");
// //手动判断参数是否合法,
// CalculatedParam<String> calculatedParam = judgeExecuteParam(baseParam);
// // 测点索引
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
//
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept data = deptFeignClient.getRootDept().getData();
// deptGetLineParam.setDeptId(data.getId());
// List<DeptGetSubStationDTO> data1 = commTerminalGeneralClient.deptSubStation(deptGetLineParam).getData();
// List<String> stationIds = new ArrayList<>();
// for (DeptGetSubStationDTO deptGetSubStationDTO : data1) {
// Collection<String> union = CollectionUtils.union(Optional.ofNullable(deptGetSubStationDTO.getStationIds()).orElse(new ArrayList<String>()),
// Optional.ofNullable(deptGetSubStationDTO.getStationIds()).orElse(new ArrayList<String>()));
// List<String> collect = union.stream().distinct().collect(Collectors.toList());
// stationIds.addAll(collect);
// }
// stationIds = stationIds.stream().distinct().collect(Collectors.toList());
// calculatedParam.setIdList(stationIds);
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
//
// }
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("变电站算法执行链")
@PostMapping("/substationExecutor")
@Async("asyncExecutor")
public void substationExecutor(@RequestBody BaseParam baseParam) {
String methodDescribe = getMethodDescribe("substationExecutor");
//手动判断参数是否合法,
CalculatedParam<String> calculatedParam = judgeExecuteParam(baseParam);
// 测点索引
DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
Dept data = deptFeignClient.getRootDept().getData();
deptGetLineParam.setDeptId(data.getId());
List<DeptGetSubStationDTO> data1 = commTerminalGeneralClient.deptSubStation(deptGetLineParam).getData();
List<String> stationIds = new ArrayList<>();
for (DeptGetSubStationDTO deptGetSubStationDTO : data1) {
Collection<String> union = CollectionUtils.union(Optional.ofNullable(deptGetSubStationDTO.getStationIds()).orElse(new ArrayList<String>()),
Optional.ofNullable(deptGetSubStationDTO.getStationIds()).orElse(new ArrayList<String>()));
List<String> collect = union.stream().distinct().collect(Collectors.toList());
stationIds.addAll(collect);
}
stationIds = stationIds.stream().distinct().collect(Collectors.toList());
calculatedParam.setIdList(stationIds);
}
LiteflowResponse liteflowResponse;
if (baseParam.isRepair()) {
//补招时,起始日期、截止日期必填
DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链
for (int i = 0; i < betweenDay; i++) {
if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1);
}
calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
}
} else {
//非补招
liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
}
}
//
// /**
// * pms dim母线电站运行情况

View File

@@ -1,7 +1,9 @@
package com.njcn.algorithm.executor;
import com.njcn.algorithm.service.line.IDataIntegrityService;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.IDataOrgPointService;
import com.njcn.algorithm.service.line.IPollutionService;
import com.njcn.device.biz.pojo.dto.DeptGetChildrenMoreDTO;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
@@ -26,6 +28,9 @@ public class OrgPointExecutor extends BaseExecutor{
@Resource
private IDataOrgPointService dataOrgPointService;
@Resource
private IPollutionService pollutionService;
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataOrgIntegrity", nodeType = NodeTypeEnum.COMMON)
public boolean dataOrgIntegrityAccess(NodeComponent bindCmp) {
@@ -36,4 +41,39 @@ public class OrgPointExecutor extends BaseExecutor{
dataOrgPointService.dataOrgIntegrity(bindCmp.getRequestData());
}
/**
* 3.4.16. 单位污区图统计(新增)
*
*/
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "rStatPollutionOrg", nodeType = NodeTypeEnum.COMMON)
public boolean processRStatPollutionOrgAccess(NodeComponent bindCmp) {
return isAccess(bindCmp);
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "rStatPollutionOrg", nodeType = NodeTypeEnum.COMMON)
public void processRStatPollutionOrgProcess(NodeComponent bindCmp) {
String tag = bindCmp.getTag();
CalculatedParam<DeptGetChildrenMoreDTO> calculatedParam = bindCmp.getRequestData();
if ("r_stat_pollution_org_d".equalsIgnoreCase(tag)) {
//日表
pollutionService.handleOrgDay(calculatedParam);
} else if ("r_stat_pollution_org_m".equalsIgnoreCase(tag)) {
//数据补招不执行非日表算法
if (!calculatedParam.isRepair()) {
//月表
pollutionService.handleOrgMonth(calculatedParam);
}
}else if ("r_stat_pollution_org_q".equalsIgnoreCase(tag)) {
//数据补招不执行非日表算法
if (!calculatedParam.isRepair()) {
//季表
pollutionService.handleOrgQtr(calculatedParam);
}
}else if ("r_stat_pollution_org_y".equalsIgnoreCase(tag)) {
//数据补招不执行非日表算法
if (!calculatedParam.isRepair()) {
//年表
pollutionService.handleOrgYear(calculatedParam);
}
}
}
}

View File

@@ -0,0 +1,64 @@
package com.njcn.algorithm.executor;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.IPollutionService;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import lombok.RequiredArgsConstructor;
import javax.annotation.Resource;
/**
* Description:
* Date: 2023/11/10 10:39【需求编号】
*
* @author clam
* @version V1.0.0
*/
@LiteflowComponent
@RequiredArgsConstructor
public class SubStationExecutor extends BaseExecutor{
@Resource
private IPollutionService pollutionService;
/**
* 3.4.15. 变电站污区图统计(新增)
*
*/
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "rStatPollutionSubstation", nodeType = NodeTypeEnum.COMMON)
public boolean processRStatPollutionSubstationAccess(NodeComponent bindCmp) {
return isAccess(bindCmp);
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "rStatPollutionSubstation", nodeType = NodeTypeEnum.COMMON)
public void processRStatPollutionSubstationProcess(NodeComponent bindCmp) {
String tag = bindCmp.getTag();
CalculatedParam<String> calculatedParam = bindCmp.getRequestData();
if ("r_stat_pollution_substation_d".equalsIgnoreCase(tag)) {
//日表
pollutionService.handleSubstationDay(calculatedParam);
} else if ("r_stat_pollution_substation_m".equalsIgnoreCase(tag)) {
//数据补招不执行非日表算法
if (!calculatedParam.isRepair()) {
//月表
pollutionService.handleSubstationMonth(calculatedParam);
}
}else if ("r_stat_pollution_substation_q".equalsIgnoreCase(tag)) {
//数据补招不执行非日表算法
if (!calculatedParam.isRepair()) {
//季表
pollutionService.handleSubstationQtr(calculatedParam);
}
}else if ("r_stat_pollution_substation_y".equalsIgnoreCase(tag)) {
//数据补招不执行非日表算法
if (!calculatedParam.isRepair()) {
//年表
pollutionService.handleSubstationYear(calculatedParam);
}
}
}
}

View File

@@ -6,8 +6,11 @@ import com.njcn.algorithm.service.line.IDataComAssService;
import com.njcn.dataProcess.api.DataComAssFeignClient;
import com.njcn.dataProcess.api.DataFlickerFeignClient;
import com.njcn.dataProcess.api.DataVFeignClient;
import com.njcn.dataProcess.api.PqDataVerifyFeignClient;
import com.njcn.dataProcess.enums.DataCleanEnum;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.DataComassesDPO;
import com.njcn.dataProcess.pojo.po.PqDataVerify;
import com.njcn.dataProcess.util.TimeUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -21,6 +24,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Description:
@@ -43,6 +47,8 @@ public class DataComAssServiceImpl implements IDataComAssService {
private DataFlickerFeignClient dataFlickerFeignClient;
@Resource
private DataComAssFeignClient dataComAssFeignClient;
@Resource
private PqDataVerifyFeignClient pqDataVerifyFeignClient;
@Override
public void dataComAssHandler(CalculatedParam calculatedParam) {
@@ -52,6 +58,7 @@ public class DataComAssServiceImpl implements IDataComAssService {
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
List<String> lineIdList = calculatedParam.getIdList();
getAbnormalData(lineParam);
for (String lineId : lineIdList) {
DataComassesDPO rStatComassesDpo = new DataComassesDPO();
rStatComassesDpo.setTime(calculatedParam.getDataDate());
@@ -104,11 +111,31 @@ public class DataComAssServiceImpl implements IDataComAssService {
}
/**
* @param lineParam
* @Description:
* @Author: wr
* @Date: 2025/3/11 9:04
*/
private void getAbnormalData(LineCountEvaluateParam lineParam) {
lineParam.setTableName(DataCleanEnum.DataV.getCode());
// 获取异常数据,用于排除异常数据
List<PqDataVerify> pqDataVerifies = pqDataVerifyFeignClient.queryData(lineParam).getData();
if (CollUtil.isNotEmpty(pqDataVerifies)) {
Map<String, List<String>> timeMap = pqDataVerifies.stream()
.collect(Collectors.groupingBy(
PqDataVerify::getLineId,
Collectors.mapping(item -> TimeUtils.LocalDateTimeToString(item.getTime()), Collectors.toList())
));
lineParam.setAbnormalTime(timeMap);
}
}
private Map<String, Object> getGeneralData(LineCountEvaluateParam lineParam) {
Map<String, Object> outMap = new HashMap<>(30);
BigDecimal hundred = BigDecimal.valueOf(100);
//************************************电压偏差********************************************
lineParam.setPhasicType(Arrays.asList("A","B","C"));
lineParam.setPhasicType(Arrays.asList("A", "B", "C"));
lineParam.setValueType(Arrays.asList("AVG"));
lineParam.setColumnName("vu_dev");
lineParam.setGe("10");

View File

@@ -7,10 +7,14 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.IPollutionService;
import com.njcn.dataProcess.api.*;
import com.njcn.dataProcess.enums.DataCleanEnum;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.*;
import com.njcn.dataProcess.pojo.po.PqDataVerify;
import com.njcn.dataProcess.util.TimeUtils;
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
import com.njcn.device.biz.pojo.dto.DeptGetChildrenMoreDTO;
import com.njcn.device.biz.pojo.dto.LineDevGetBandDTO;
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
import com.njcn.device.biz.pojo.po.Overlimit;
@@ -39,6 +43,7 @@ import java.util.stream.Stream;
@RequiredArgsConstructor
public class PollutionServiceImpl implements IPollutionService {
private final PqDataVerifyFeignClient pqDataVerifyFeignClient;
private final DicDataFeignClient dicDataFeignClient;
private final CommTerminalGeneralClient commTerminalGeneralClient;
private final DataVFeignClient dataVFeignClient;
@@ -47,10 +52,10 @@ public class PollutionServiceImpl implements IPollutionService {
private final DataIFeignClient dataIFeignClient;
private final DataPollutionFeignClient dataPollutionFeignClient;
@Override
public void handleDay(CalculatedParam<String> calculatedParam) {
List<DataPollutionD> pollutionList;
List<DictData> dictDataList = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.POLLUTION_STATIS.getCode()).getData();
Map<String, DictData> dictData = dictDataList.stream().collect(Collectors.toMap(DictData::getId, Function.identity()));
LocalDate local = LocalDateTimeUtil.parseDate(calculatedParam.getDataDate());
@@ -62,10 +67,9 @@ public class PollutionServiceImpl implements IPollutionService {
LineCountEvaluateParam lineCountEvaluateParam = new LineCountEvaluateParam();
lineCountEvaluateParam.setLineId(idList);
lineCountEvaluateParam.setStartTime(beginDay);
lineCountEvaluateParam.setEndTime(endDay);
getAbnormalData(lineCountEvaluateParam);
//指标数据获取
Map<String, DataPollutionD> harmonicVoltageList = getHarmonicVoltage(lineCountEvaluateParam, limitMap);
Map<String, DataPollutionD> harmonicCurrentList = getHarmonicCurrent(lineCountEvaluateParam, limitMap);
@@ -88,52 +92,279 @@ public class PollutionServiceImpl implements IPollutionService {
@Override
public void handleOrgDay(CalculatedParam<DeptGetChildrenMoreDTO> calculatedParam) {
LocalDate local = LocalDateTimeUtil.parseDate(calculatedParam.getDataDate());
QueryWrapper<DataPollutionD> queryWrapper = new QueryWrapper<>();
List<DataPollutionOrgD> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(calculatedParam.getDataDate());
for (DeptGetChildrenMoreDTO deptGetChildrenMoreDTO : calculatedParam.getIdList()) {
Collection<LineDevGetDTO> union = CollectionUtils.union(Optional.ofNullable(deptGetChildrenMoreDTO.getLineBaseList()).orElse(new ArrayList<>()),
Optional.ofNullable(deptGetChildrenMoreDTO.getPwMonitorIds()).orElse(new ArrayList<>()));
Collection<LineDevGetDTO> union = CollectionUtils.union
(Optional.ofNullable(deptGetChildrenMoreDTO.getLineBaseList()).orElse(new ArrayList<>()),
Optional.ofNullable(deptGetChildrenMoreDTO.getPwMonitorIds()).orElse(new ArrayList<>()));
List<String> lineIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lineIds)) {
lineParam.setLineId(lineIds);
List<DataPollutionD> data = dataPollutionFeignClient.getRawData(lineParam).getData();
Map<String, List<DataPollutionD>> pollutionMap = data.stream().collect(Collectors.groupingBy(DataPollutionD::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionOrgD orgD = new DataPollutionOrgD();
orgD.setOrgId(deptGetChildrenMoreDTO.getUnitId());
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionD::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionOrgD(info);
}
}
@Override
public void handleOrgMonth(CalculatedParam<DeptGetChildrenMoreDTO> calculatedParam) {
List<DataPollutionOrgM> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfMonth(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfMonth(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(lineParam.getStartTime());
for (DeptGetChildrenMoreDTO deptGetChildrenMoreDTO : calculatedParam.getIdList()) {
Collection<LineDevGetDTO> union = CollectionUtils.union
(Optional.ofNullable(deptGetChildrenMoreDTO.getLineBaseList()).orElse(new ArrayList<>()),
Optional.ofNullable(deptGetChildrenMoreDTO.getPwMonitorIds()).orElse(new ArrayList<>()));
List<String> lineIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lineIds)) {
lineParam.setLineId(lineIds);
List<DataPollutionOrgD> data = dataPollutionFeignClient.getRawDataOrgD(lineParam).getData();
Map<String, List<DataPollutionOrgD>> pollutionMap = data.stream().collect(Collectors.groupingBy(DataPollutionOrgD::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionOrgM orgD = new DataPollutionOrgM();
orgD.setOrgId(deptGetChildrenMoreDTO.getUnitId());
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionOrgD::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionOrgM(info);
}
}
@Override
public void handleOrgQtr(CalculatedParam<DeptGetChildrenMoreDTO> calculatedParam) {
List<DataPollutionOrgQ> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfQuarter(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfQuarter(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(lineParam.getStartTime());
for (DeptGetChildrenMoreDTO deptGetChildrenMoreDTO : calculatedParam.getIdList()) {
Collection<LineDevGetDTO> union = CollectionUtils.union
(Optional.ofNullable(deptGetChildrenMoreDTO.getLineBaseList()).orElse(new ArrayList<>()),
Optional.ofNullable(deptGetChildrenMoreDTO.getPwMonitorIds()).orElse(new ArrayList<>()));
List<String> lineIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lineIds)) {
lineParam.setLineId(lineIds);
List<DataPollutionOrgM> data = dataPollutionFeignClient.getRawDataOrgM(lineParam).getData();
Map<String, List<DataPollutionOrgM>> pollutionMap = data.stream().collect(Collectors.groupingBy(DataPollutionOrgM::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionOrgQ orgD = new DataPollutionOrgQ();
orgD.setOrgId(deptGetChildrenMoreDTO.getUnitId());
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionOrgM::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionOrgQ(info);
}
}
@Override
public void handleOrgYear(CalculatedParam<DeptGetChildrenMoreDTO> calculatedParam) {
List<DataPollutionOrgY> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfYear(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfYear(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(lineParam.getStartTime());
for (DeptGetChildrenMoreDTO deptGetChildrenMoreDTO : calculatedParam.getIdList()) {
Collection<LineDevGetDTO> union = CollectionUtils.union
(Optional.ofNullable(deptGetChildrenMoreDTO.getLineBaseList()).orElse(new ArrayList<>()),
Optional.ofNullable(deptGetChildrenMoreDTO.getPwMonitorIds()).orElse(new ArrayList<>()));
List<String> lineIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lineIds)) {
lineParam.setLineId(lineIds);
List<DataPollutionOrgQ> data = dataPollutionFeignClient.getRawDataOrgQ(lineParam).getData();
Map<String, List<DataPollutionOrgQ>> pollutionMap = data.stream().collect(Collectors.groupingBy(DataPollutionOrgQ::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionOrgY orgD = new DataPollutionOrgY();
orgD.setOrgId(deptGetChildrenMoreDTO.getUnitId());
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionOrgQ::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionOrgY(info);
}
}
@Override
public void handleSubstationDay(CalculatedParam<String> calculatedParam) {
List<DataPollutionSubstationD> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(calculatedParam.getDataDate());
for (String substationId : calculatedParam.getIdList()) {
LineDevGetBandDTO data = commTerminalGeneralClient.substationGetLine(substationId).getData();
Collection<LineDevGetDTO> union = CollectionUtils.union(Optional.ofNullable(data.getPwList()).orElse(new ArrayList<>()),
Optional.ofNullable(data.getZwList()).orElse(new ArrayList<>()));
List<String> lindIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lindIds)) {
lineParam.setLineId(lindIds);
List<DataPollutionD> pollutionDS = dataPollutionFeignClient.getRawData(lineParam).getData();
Map<String, List<DataPollutionD>> pollutionMap = pollutionDS.stream().collect(Collectors.groupingBy(DataPollutionD::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionSubstationD orgD = new DataPollutionSubstationD();
orgD.setSubstationId(substationId);
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionD::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionSubstationD(info);
}
}
@Override
public void handleSubstationMonth(CalculatedParam<String> calculatedParam) {
List<DataPollutionSubstationM> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfMonth(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfMonth(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(lineParam.getStartTime());
for (String substationId : calculatedParam.getIdList()) {
LineDevGetBandDTO data = commTerminalGeneralClient.substationGetLine(substationId).getData();
Collection<LineDevGetDTO> union = CollectionUtils.union(Optional.ofNullable(data.getPwList()).orElse(new ArrayList<>()),
Optional.ofNullable(data.getZwList()).orElse(new ArrayList<>()));
List<String> lindIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lindIds)) {
lineParam.setLineId(lindIds);
List<DataPollutionSubstationD> pollutionDS = dataPollutionFeignClient.getRawDataSubstationD(lineParam).getData();
Map<String, List<DataPollutionSubstationD>> pollutionMap = pollutionDS.stream().collect(Collectors.groupingBy(DataPollutionSubstationD::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionSubstationM orgD = new DataPollutionSubstationM();
orgD.setSubstationId(substationId);
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionSubstationD::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionSubstationM(info);
}
}
@Override
public void handleSubstationQtr(CalculatedParam<String> calculatedParam) {
List<DataPollutionSubstationQ> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfQuarter(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfQuarter(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(lineParam.getStartTime());
for (String substationId : calculatedParam.getIdList()) {
LineDevGetBandDTO data = commTerminalGeneralClient.substationGetLine(substationId).getData();
Collection<LineDevGetDTO> union = CollectionUtils.union(Optional.ofNullable(data.getPwList()).orElse(new ArrayList<>()),
Optional.ofNullable(data.getZwList()).orElse(new ArrayList<>()));
List<String> lindIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lindIds)) {
lineParam.setLineId(lindIds);
List<DataPollutionSubstationM> pollutionDS = dataPollutionFeignClient.getRawDataSubstationM(lineParam).getData();
Map<String, List<DataPollutionSubstationM>> pollutionMap = pollutionDS.stream().collect(Collectors.groupingBy(DataPollutionSubstationM::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionSubstationQ orgD = new DataPollutionSubstationQ();
orgD.setSubstationId(substationId);
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionSubstationM::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionSubstationQ(info);
}
}
@Override
public void handleSubstationYear(CalculatedParam<String> calculatedParam) {
List<DataPollutionSubstationY> info = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfYear(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfYear(calculatedParam.getDataDate()));
LocalDate localDate = LocalDateTimeUtil.parseDate(lineParam.getStartTime());
for (String substationId : calculatedParam.getIdList()) {
LineDevGetBandDTO data = commTerminalGeneralClient.substationGetLine(substationId).getData();
Collection<LineDevGetDTO> union = CollectionUtils.union(Optional.ofNullable(data.getPwList()).orElse(new ArrayList<>()),
Optional.ofNullable(data.getZwList()).orElse(new ArrayList<>()));
List<String> lindIds = union.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
if (CollUtil.isNotEmpty(lindIds)) {
lineParam.setLineId(lindIds);
List<DataPollutionSubstationQ> pollutionDS = dataPollutionFeignClient.getRawDataSubstationQ(lineParam).getData();
Map<String, List<DataPollutionSubstationQ>> pollutionMap = pollutionDS.stream().collect(Collectors.groupingBy(DataPollutionSubstationQ::getPollutionType));
pollutionMap.forEach((key, value) -> {
DataPollutionSubstationY orgD = new DataPollutionSubstationY();
orgD.setSubstationId(substationId);
orgD.setDataDate(localDate);
orgD.setPollutionType(key);
orgD.setValue(value.stream().max(Comparator.comparing(DataPollutionSubstationQ::getValue)).get().getValue());
info.add(orgD);
});
}
}
if (CollUtil.isNotEmpty(info)) {
dataPollutionFeignClient.batchInsertionSubstationY(info);
}
}
/**
* @param lineParam
* @Description:
* @Author: wr
* @Date: 2025/3/11 9:04
*/
private void getAbnormalData(LineCountEvaluateParam lineParam) {
lineParam.setTableName(DataCleanEnum.DataV.getCode());
// 获取异常数据,用于排除异常数据
List<PqDataVerify> pqDataVerifies = pqDataVerifyFeignClient.queryData(lineParam).getData();
if (CollUtil.isNotEmpty(pqDataVerifies)) {
Map<String, List<String>> timeMap = pqDataVerifies.stream()
.collect(Collectors.groupingBy(
PqDataVerify::getLineId,
Collectors.mapping(item -> TimeUtils.LocalDateTimeToString(item.getTime()), Collectors.toList())
));
lineParam.setAbnormalTime(timeMap);
}
}
private void assPollution(String lineId, LocalDate local, DictData dic, Map<String, DataPollutionD> map, List<DataPollutionD> result) {
DataPollutionD tem;
@@ -155,10 +386,10 @@ public class PollutionServiceImpl implements IPollutionService {
* 参数拼装处理
*/
private List<DataPollutionD> processPollutionList(LocalDate local, List<String> lineIds, Map<String, DictData> dictData,
Map<String, DataPollutionD> harmonicVoltageMap, Map<String, DataPollutionD> harmonicCurrentMap,
Map<String, DataPollutionD> frequencyDeviationMap, Map<String, DataPollutionD> voltageDeviationMap,
Map<String, DataPollutionD> threePhaseVoltageMap, Map<String, DataPollutionD> negativeSequenceMap,
Map<String, DataPollutionD> interharmonicVoltageMap, Map<String, DataPollutionD> voltageFlickerMap) {
Map<String, DataPollutionD> harmonicVoltageMap, Map<String, DataPollutionD> harmonicCurrentMap,
Map<String, DataPollutionD> frequencyDeviationMap, Map<String, DataPollutionD> voltageDeviationMap,
Map<String, DataPollutionD> threePhaseVoltageMap, Map<String, DataPollutionD> negativeSequenceMap,
Map<String, DataPollutionD> interharmonicVoltageMap, Map<String, DataPollutionD> voltageFlickerMap) {
List<DataPollutionD> result = new ArrayList<>();
DictData dicVHarmonic = dictData.get("V_Harmonic");
DictData dicIHarmonic = dictData.get("I_All");