feat(topology): 添加拓扑图目标指标功能并重构数据处理逻辑
- 在AppLineTopologyDiagram相关类中添加target字段用于绑定指标 - 重构DeviceMessageClient的getLineInfo方法参数结构 - 实现CsLineTopologyFeignClient的queryTopologyDiagram接口及降级处理 - 更新MqttMessageHandler中拓扑图数据处理逻辑,支持自定义指标查询 - 添加PqdData数据拆分服务IPqdDataSplitService及其实现 - 优化PortableOfflLogServiceImpl中的数据入库逻辑 - 修复CsTerminalReplyServiceImpl中时间范围查询条件 - 移除未使用的导入和代码,优化服务依赖注入
This commit is contained in:
@@ -7,7 +7,12 @@ import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
|
||||
import com.github.tocrhz.mqtt.annotation.NamedValue;
|
||||
import com.github.tocrhz.mqtt.annotation.Payload;
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.csdevice.api.CsLineFeignClient;
|
||||
import com.njcn.csdevice.api.CsLineTopologyFeignClient;
|
||||
import com.njcn.csdevice.api.DevCapacityFeignClient;
|
||||
import com.njcn.csdevice.api.fallback.CsLineTopologyClientFallbackFactory;
|
||||
import com.njcn.csdevice.pojo.po.CsLinePO;
|
||||
import com.njcn.csdevice.pojo.vo.AppTopologyDiagramVO;
|
||||
import com.njcn.csharmonic.param.CommonStatisticalQueryParam;
|
||||
import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam;
|
||||
import com.njcn.csharmonic.pojo.dto.RealTimeDataDTO;
|
||||
@@ -21,6 +26,10 @@ import com.njcn.csharmonic.service.TemperatureService;
|
||||
import com.njcn.influx.pojo.dto.StatisticalDataDTO;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.CsStatisticalSetFeignClient;
|
||||
import com.njcn.system.api.DicDataFeignClient;
|
||||
import com.njcn.system.enums.DicDataEnum;
|
||||
import com.njcn.system.enums.DicDataTypeEnum;
|
||||
import com.njcn.system.pojo.po.DictData;
|
||||
import com.njcn.system.pojo.po.EleEpdPqd;
|
||||
import com.njcn.zlevent.api.FileFeignClient;
|
||||
import lombok.AllArgsConstructor;
|
||||
@@ -33,6 +42,7 @@ import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -56,6 +66,9 @@ public class MqttMessageHandler {
|
||||
private final DecimalFormat df = new DecimalFormat("#0.000");
|
||||
private final FileFeignClient fileFeignClient;
|
||||
private final CsEventPOService csEventPOService;
|
||||
private final CsLineTopologyFeignClient csLineTopologyClient;
|
||||
private final CsLineFeignClient csLineFeignClient;
|
||||
private final DicDataFeignClient dicDataFeignClient;
|
||||
|
||||
/**
|
||||
* 实时数据应答
|
||||
@@ -248,7 +261,7 @@ public class MqttMessageHandler {
|
||||
|
||||
});
|
||||
|
||||
//过滤M相
|
||||
//过滤T相
|
||||
List<ThdDataVO> m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "T")).collect(Collectors.toList());
|
||||
m.stream().forEach(temp -> {
|
||||
Stream.of("A", "B", "C").forEach(phase -> {
|
||||
@@ -260,41 +273,134 @@ public class MqttMessageHandler {
|
||||
});
|
||||
//如果是基础数据则添加拓扑图的数据
|
||||
if (Objects.equals("fc8c86dbc3f2d9810f5cd8f53c295415", typeId)) {
|
||||
List<ThdDataVO> apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList());
|
||||
Map<String, List<ThdDataVO>> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect3.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
}
|
||||
});
|
||||
List<ThdDataVO> apfRmsI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_RmsI_TolOut(A)")).collect(Collectors.toList());
|
||||
Map<String, List<ThdDataVO>> collect2 = apfRmsI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect2.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
}
|
||||
});
|
||||
List<ThdDataVO> apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList());
|
||||
Map<String, List<ThdDataVO>> collect4 = apfThdISys.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect4.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
|
||||
List<DictData> dictData = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.LINE_POSITION.getCode()).getData();
|
||||
Map<String, DictData> codeToDictData = dictData.stream().collect(Collectors.toMap(DictData::getId, Function.identity()));
|
||||
//添加拓扑图上自定义的指标
|
||||
//根据设备id查询拓扑图
|
||||
AppTopologyDiagramVO vo = csLineTopologyClient.queryTopologyDiagram(devId).getData();
|
||||
vo.getAppsLineTopologyDiagramPO().forEach(item -> {
|
||||
String linePositionCode = codeToDictData.get(item.getLinePostion()).getCode();
|
||||
if (Objects.equals(linePositionCode, DicDataEnum.OUTPUT_SIDE.getCode())) {
|
||||
//输出侧
|
||||
if (item.getTarget() != null && !item.getTarget().isEmpty()) {
|
||||
List<ThdDataVO> l1 = new ArrayList<>();
|
||||
//根据指标查询
|
||||
List<EleEpdPqd> data1 = csStatisticalSetFeignClient.queryStatisticalSelect(item.getTarget()).getData();
|
||||
data1.forEach(temp -> {
|
||||
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
|
||||
commonStatisticalQueryParam.setDevId(devId);
|
||||
commonStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||
commonStatisticalQueryParam.setValueType("avg");
|
||||
List<ThdDataVO> listFuture = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
|
||||
l1.addAll(listFuture);
|
||||
});
|
||||
Map<String, List<ThdDataVO>> collect3 = l1.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect3.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
Map<String, List<ThdDataVO>> collect2 = apfRmsI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect2.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else if (Objects.equals(linePositionCode, DicDataEnum.GRID_SIDE.getCode())) {
|
||||
//电网侧
|
||||
if (item.getTarget() != null && !item.getTarget().isEmpty()) {
|
||||
//根据指标查询
|
||||
List<ThdDataVO> l1 = new ArrayList<>();
|
||||
//根据指标查询
|
||||
List<EleEpdPqd> data1 = csStatisticalSetFeignClient.queryStatisticalSelect(item.getTarget()).getData();
|
||||
data1.forEach(temp -> {
|
||||
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
|
||||
commonStatisticalQueryParam.setDevId(devId);
|
||||
commonStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||
commonStatisticalQueryParam.setValueType("avg");
|
||||
List<ThdDataVO> listFuture = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
|
||||
l1.addAll(listFuture);
|
||||
});
|
||||
Map<String, List<ThdDataVO>> collect3 = l1.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect3.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
List<ThdDataVO> apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList());
|
||||
Map<String, List<ThdDataVO>> collect4 = apfThdISys.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect4.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
} else if (Objects.equals(linePositionCode, DicDataEnum.LOAD_SIDE.getCode())) {
|
||||
//负载侧
|
||||
if (item.getTarget() != null && !item.getTarget().isEmpty()) {
|
||||
//根据指标查询
|
||||
List<ThdDataVO> l1 = new ArrayList<>();
|
||||
//根据指标查询
|
||||
List<EleEpdPqd> data1 = csStatisticalSetFeignClient.queryStatisticalSelect(item.getTarget()).getData();
|
||||
data1.forEach(temp -> {
|
||||
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
|
||||
commonStatisticalQueryParam.setDevId(devId);
|
||||
commonStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||
commonStatisticalQueryParam.setValueType("avg");
|
||||
List<ThdDataVO> listFuture = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
|
||||
l1.addAll(listFuture);
|
||||
});
|
||||
Map<String, List<ThdDataVO>> collect3 = l1.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect3.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
List<ThdDataVO> apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList());
|
||||
Map<String, List<ThdDataVO>> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect3.forEach((k, v) -> {
|
||||
if (!CollectionUtil.isEmpty(v)) {
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0), thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
Double capacity = devCapacityFeignClient.getDevCapacity(devId).getData();
|
||||
@@ -323,7 +429,6 @@ public class MqttMessageHandler {
|
||||
publisher.send("/zl/devData/" + devId + "/" + typeId, topoDataJson, 1, false);
|
||||
}
|
||||
|
||||
|
||||
public String getCldidName(String cldid) {
|
||||
|
||||
switch (cldid) {
|
||||
|
||||
Reference in New Issue
Block a user