提交代码
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.njcn.csharmonic.handler;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
|
||||
import com.github.tocrhz.mqtt.annotation.NamedValue;
|
||||
@@ -15,10 +16,16 @@ import com.njcn.system.pojo.po.EleEpdPqd;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* @author xuyang
|
||||
@@ -35,6 +42,7 @@ public class MqttMessageHandler {
|
||||
private final ILineTargetService lineTargetService;
|
||||
private final CsStatisticalSetFeignClient csStatisticalSetFeignClient;
|
||||
private final StableDataService stableDataService;
|
||||
private final DecimalFormat df = new DecimalFormat("#0.00");
|
||||
/**
|
||||
* 实时数据应答
|
||||
*/
|
||||
@@ -51,16 +59,61 @@ public class MqttMessageHandler {
|
||||
@MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1)
|
||||
public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
|
||||
List<ThdDataVO> result = new ArrayList<>();
|
||||
List<ThdDataVO> tempList = new ArrayList<>();
|
||||
|
||||
//1.查询拓扑图配置的指标:拓扑图扑图配置:7677f94c749dedaff30f911949cbd724
|
||||
List<EleEpdPqd> data = csStatisticalSetFeignClient.queryStatisticalSelect("7677f94c749dedaff30f911949cbd724").getData();
|
||||
data.forEach(temp->{
|
||||
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
|
||||
commonStatisticalQueryParam.setDevId(devId);
|
||||
commonStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||
commonStatisticalQueryParam.setValueType("cp95");
|
||||
commonStatisticalQueryParam.setValueType("avg");
|
||||
List<ThdDataVO> thdDataVOS = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
|
||||
result.addAll(thdDataVOS);
|
||||
tempList.addAll(thdDataVOS);
|
||||
});
|
||||
//过滤M相
|
||||
List<ThdDataVO> m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
|
||||
m.stream().forEach(temp->{
|
||||
Stream.of("A","B","C").forEach(phase->{
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(temp,thdDataVO);
|
||||
thdDataVO.setPhase(phase);
|
||||
result.add(thdDataVO);
|
||||
});
|
||||
});
|
||||
//过滤谐波电流,谐波电压畸变率求平均值
|
||||
List<ThdDataVO> thdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "ThdPhI(%)")).collect(Collectors.toList());
|
||||
Map<String, List<ThdDataVO>> collect = thdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect.forEach((k,v)->{
|
||||
if(!CollectionUtil.isEmpty(v)){
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0),thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
List<ThdDataVO> thdV = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "ThdPhV(%)")).collect(Collectors.toList());
|
||||
Map<String, List<ThdDataVO>> collect1 = thdV.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||
collect1.forEach((k,v)->{
|
||||
if(!CollectionUtil.isEmpty(v)){
|
||||
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
||||
ThdDataVO thdDataVO = new ThdDataVO();
|
||||
BeanUtils.copyProperties(v.get(0),thdDataVO);
|
||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||
thdDataVO.setPhase("avg");
|
||||
result.add(thdDataVO);
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
List<ThdDataVO> notM = tempList.stream().filter(temp -> !Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
|
||||
|
||||
result.addAll(notM);
|
||||
|
||||
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
@@ -154,11 +154,22 @@ public class StableDataServiceImpl implements StableDataService {
|
||||
|
||||
@Override
|
||||
public List<ThdDataVO> queryFisrtCommonStatistical(CommonStatisticalQueryParam commonStatisticalQueryParam) {
|
||||
Optional.ofNullable(commonStatisticalQueryParam.getDevId()).orElseThrow(()-> new BusinessException(AlgorithmResponseEnum.DEVICE_LOSE));
|
||||
List<String> collect = new ArrayList<>();
|
||||
List<CsLinePO> csLinePOList = new ArrayList<>();
|
||||
if(Objects.isNull(commonStatisticalQueryParam.getDevId())){
|
||||
if(!CollectionUtil.isEmpty(commonStatisticalQueryParam.getLineList())){
|
||||
collect =commonStatisticalQueryParam.getLineList();
|
||||
csLinePOList = csLineFeignClient.queryLineById(collect).getData();
|
||||
}else {
|
||||
throw new BusinessException(AlgorithmResponseEnum.LINE_DATA_ERROR);
|
||||
}
|
||||
}else {
|
||||
csLinePOList = csLineFeignClient.queryLineByDevId(commonStatisticalQueryParam.getDevId()).getData();
|
||||
|
||||
List<CsLinePO> csLinePOList = csLineFeignClient.queryLineByDevId(commonStatisticalQueryParam.getDevId()).getData();
|
||||
}
|
||||
Optional.ofNullable(csLinePOList).orElseThrow(()-> new BusinessException(AlgorithmResponseEnum.LINE_DATA_ERROR));
|
||||
List<String> collect = csLinePOList.stream().map(CsLinePO::getLineId).collect(Collectors.toList());
|
||||
collect = csLinePOList.stream().map(CsLinePO::getLineId).collect(Collectors.toList());
|
||||
|
||||
|
||||
|
||||
EleEpdPqd data = epdFeignClient.selectById(commonStatisticalQueryParam.getStatisticalId()).getData();
|
||||
@@ -166,17 +177,18 @@ public class StableDataServiceImpl implements StableDataService {
|
||||
String frequency = Optional.ofNullable(commonStatisticalQueryParam.getFrequency()).orElse("");
|
||||
|
||||
List<StatisticalDataDTO> deviceRtData = commonService.getDeviceRtData(collect, data.getClassId(), data.getName()+frequency, data.getPhase(), commonStatisticalQueryParam.getValueType());
|
||||
List<CsLinePO> finalCsLinePOList = csLinePOList;
|
||||
List<ThdDataVO> collect1 = deviceRtData.stream().map(temp -> {
|
||||
ThdDataVO vo = new ThdDataVO();
|
||||
vo.setLineId(temp.getLineId());
|
||||
vo.setPhase(temp.getPhaseType());
|
||||
String position = csLinePOList.stream().filter(csLinePO -> Objects.equals(csLinePO.getLineId(), vo.getLineId())).collect(Collectors.toList()).get(0).getPosition();
|
||||
String position = finalCsLinePOList.stream().filter(csLinePO -> Objects.equals(csLinePO.getLineId(), vo.getLineId())).collect(Collectors.toList()).get(0).getPosition();
|
||||
vo.setPosition(position);
|
||||
vo.setTime(temp.getTime().atZone(ZoneId.systemDefault()).toLocalDateTime());
|
||||
vo.setStatMethod(temp.getValueType());
|
||||
vo.setStatisticalData(BigDecimal.valueOf(temp.getValue()).setScale(4, RoundingMode.UP).doubleValue());
|
||||
vo.setStatisticalIndex(data.getId());
|
||||
vo.setStatisticalName(temp.getStatisticalName());
|
||||
vo.setStatisticalName(data.getName()+"("+data.getUnit()+")");
|
||||
return vo;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user