代码优化
This commit is contained in:
@@ -30,7 +30,6 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.*;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@@ -75,35 +74,21 @@ public class MqttMessageHandler {
|
|||||||
Gson gson = new Gson();
|
Gson gson = new Gson();
|
||||||
publisher.send("/zl/TemperData/"+devId,gson.toJson(statisticalDataDTOS),1,false);
|
publisher.send("/zl/TemperData/"+devId,gson.toJson(statisticalDataDTOS),1,false);
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
* 谐波数据
|
/**
|
||||||
|
* 实时数据应答
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// @MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1)
|
|
||||||
// public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 实时数据应答
|
|
||||||
*/
|
|
||||||
@MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1)
|
@MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1)
|
||||||
public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
|
public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
|
||||||
String topoDataJson =redisUtil.getStringByKey (devId);
|
String topoDataJson =redisUtil.getStringByKey (devId);
|
||||||
if(StringUtils.isEmpty(topoDataJson)){
|
if(StringUtils.isEmpty(topoDataJson)){
|
||||||
List<ThdDataVO> result = new ArrayList<>();
|
List<ThdDataVO> result = new ArrayList<>();
|
||||||
List<ThdDataVO> tempList = new ArrayList<>();
|
List<ThdDataVO> tempList = new ArrayList<>();
|
||||||
// ExecutorService executorService = new ThreadPoolExecutor(10, Integer.MAX_VALUE,
|
|
||||||
// 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
|
|
||||||
// Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
|
|
||||||
// List<Future<List<ThdDataVO>>> resultList = new ArrayList< Future<List<ThdDataVO>>>();
|
|
||||||
|
|
||||||
//1.查询拓扑图配置的指标:拓扑图扑图配置:7677f94c749dedaff30f911949cbd724
|
//1.查询拓扑图配置的指标:拓扑图扑图配置:7677f94c749dedaff30f911949cbd724
|
||||||
List<EleEpdPqd> data = csStatisticalSetFeignClient.queryStatisticalSelect("b934664f9592d1c5e92caa90695b7103").getData();
|
List<EleEpdPqd> data = csStatisticalSetFeignClient.queryStatisticalSelect("b934664f9592d1c5e92caa90695b7103").getData();
|
||||||
data.forEach(temp->{
|
data.forEach(temp->{
|
||||||
if(Objects.nonNull(temp.getHarmStart())&&Objects.nonNull(temp.getHarmEnd())){
|
if(Objects.nonNull(temp.getHarmStart())&&Objects.nonNull(temp.getHarmEnd())){
|
||||||
|
|
||||||
FrequencyStatisticalQueryParam frequencyStatisticalQueryParam = new FrequencyStatisticalQueryParam();
|
FrequencyStatisticalQueryParam frequencyStatisticalQueryParam = new FrequencyStatisticalQueryParam();
|
||||||
frequencyStatisticalQueryParam.setDevId(devId);
|
frequencyStatisticalQueryParam.setDevId(devId);
|
||||||
frequencyStatisticalQueryParam.setStatisticalId(temp.getId());
|
frequencyStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||||
@@ -123,17 +108,6 @@ public class MqttMessageHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
// executorService.shutdown();
|
|
||||||
//
|
|
||||||
// resultList.forEach(temp->{
|
|
||||||
// try {
|
|
||||||
// tempList.addAll(temp.get());
|
|
||||||
// } catch (InterruptedException e) {
|
|
||||||
// throw new RuntimeException(e);
|
|
||||||
// } catch (ExecutionException e) {
|
|
||||||
// throw new RuntimeException(e);
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
//过滤M相
|
//过滤M相
|
||||||
List<ThdDataVO> m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
|
List<ThdDataVO> m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
|
||||||
m.stream().forEach(temp->{
|
m.stream().forEach(temp->{
|
||||||
@@ -144,34 +118,6 @@ public class MqttMessageHandler {
|
|||||||
result.add(thdDataVO);
|
result.add(thdDataVO);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
//过滤谐波电流,谐波电压畸变率求平均值
|
|
||||||
// List<ThdDataVO> thdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdU(%)")).collect(Collectors.toList());
|
|
||||||
// Map<String, List<ThdDataVO>> collect = thdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
|
||||||
// collect.forEach((k,v)->{
|
|
||||||
// if(!CollectionUtil.isEmpty(v)){
|
|
||||||
// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
|
||||||
// ThdDataVO thdDataVO = new ThdDataVO();
|
|
||||||
// BeanUtils.copyProperties(v.get(0),thdDataVO);
|
|
||||||
// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
|
||||||
// thdDataVO.setPhase("avg");
|
|
||||||
// result.add(thdDataVO);
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
|
|
||||||
// List<ThdDataVO> thdV = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdI(%)")).collect(Collectors.toList());
|
|
||||||
// Map<String, List<ThdDataVO>> collect1 = thdV.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
|
||||||
// collect1.forEach((k,v)->{
|
|
||||||
// if(!CollectionUtil.isEmpty(v)){
|
|
||||||
// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
|
|
||||||
// ThdDataVO thdDataVO = new ThdDataVO();
|
|
||||||
// BeanUtils.copyProperties(v.get(0),thdDataVO);
|
|
||||||
// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
|
||||||
// thdDataVO.setPhase("avg");
|
|
||||||
// result.add(thdDataVO);
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
List<ThdDataVO> apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList());
|
List<ThdDataVO> apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList());
|
||||||
Map<String, List<ThdDataVO>> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
Map<String, List<ThdDataVO>> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
|
||||||
collect3.forEach((k,v)->{
|
collect3.forEach((k,v)->{
|
||||||
@@ -182,7 +128,6 @@ public class MqttMessageHandler {
|
|||||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||||
thdDataVO.setPhase("avg");
|
thdDataVO.setPhase("avg");
|
||||||
result.add(thdDataVO);
|
result.add(thdDataVO);
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
List<ThdDataVO> apfRmsI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_RmsI_TolOut(A)")).collect(Collectors.toList());
|
List<ThdDataVO> apfRmsI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_RmsI_TolOut(A)")).collect(Collectors.toList());
|
||||||
@@ -195,7 +140,6 @@ public class MqttMessageHandler {
|
|||||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
|
||||||
thdDataVO.setPhase("avg");
|
thdDataVO.setPhase("avg");
|
||||||
result.add(thdDataVO);
|
result.add(thdDataVO);
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
List<ThdDataVO> apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList());
|
List<ThdDataVO> apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList());
|
||||||
@@ -212,8 +156,6 @@ public class MqttMessageHandler {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
Double capacity = devCapacityFeignClient.getDevCapacity(devId).getData();
|
Double capacity = devCapacityFeignClient.getDevCapacity(devId).getData();
|
||||||
|
|
||||||
|
|
||||||
apfRmsI.forEach(temp->{
|
apfRmsI.forEach(temp->{
|
||||||
ThdDataVO thdDataVO = new ThdDataVO();
|
ThdDataVO thdDataVO = new ThdDataVO();
|
||||||
BeanUtils.copyProperties(temp,thdDataVO);
|
BeanUtils.copyProperties(temp,thdDataVO);
|
||||||
@@ -225,24 +167,15 @@ public class MqttMessageHandler {
|
|||||||
}else {
|
}else {
|
||||||
double v = temp.getStatisticalData() / capacity;
|
double v = temp.getStatisticalData() / capacity;
|
||||||
thdDataVO.setStatisticalData(Double.valueOf(df.format(v)));
|
thdDataVO.setStatisticalData(Double.valueOf(df.format(v)));
|
||||||
|
|
||||||
}
|
}
|
||||||
result.add(thdDataVO);
|
result.add(thdDataVO);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
List<ThdDataVO> notM = tempList.stream().filter(temp -> !Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
|
List<ThdDataVO> notM = tempList.stream().filter(temp -> !Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
|
||||||
|
|
||||||
result.addAll(notM);
|
result.addAll(notM);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Gson gson = new Gson();
|
Gson gson = new Gson();
|
||||||
topoDataJson = gson.toJson(result);
|
topoDataJson = gson.toJson(result);
|
||||||
redisUtil.saveByKeyWithExpire(devId, (Object) topoDataJson,Long.valueOf(10*60));
|
redisUtil.saveByKeyWithExpire(devId, (Object) topoDataJson, 30L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
publisher.send("/zl/devData/"+devId,topoDataJson,1,false);
|
publisher.send("/zl/devData/"+devId,topoDataJson,1,false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user