diff --git a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java index cff1cd1..40f81b3 100644 --- a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java +++ b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java @@ -30,7 +30,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -75,35 +74,21 @@ public class MqttMessageHandler { Gson gson = new Gson(); publisher.send("/zl/TemperData/"+devId,gson.toJson(statisticalDataDTOS),1,false); } - /* - * 谐波数据 + + /** + * 实时数据应答 */ - -// @MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1) -// public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) { -// -// } - - - /** - * 实时数据应答 - */ @MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1) public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) { String topoDataJson =redisUtil.getStringByKey (devId); if(StringUtils.isEmpty(topoDataJson)){ List result = new ArrayList<>(); List tempList = new ArrayList<>(); -// ExecutorService executorService = new ThreadPoolExecutor(10, Integer.MAX_VALUE, -// 1, TimeUnit.MINUTES, new LinkedBlockingQueue(), -// Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); -// List>> resultList = new ArrayList< Future>>(); //1.查询拓扑图配置的指标:拓扑图扑图配置:7677f94c749dedaff30f911949cbd724 List data = csStatisticalSetFeignClient.queryStatisticalSelect("b934664f9592d1c5e92caa90695b7103").getData(); data.forEach(temp->{ if(Objects.nonNull(temp.getHarmStart())&&Objects.nonNull(temp.getHarmEnd())){ - FrequencyStatisticalQueryParam frequencyStatisticalQueryParam = new FrequencyStatisticalQueryParam(); frequencyStatisticalQueryParam.setDevId(devId); frequencyStatisticalQueryParam.setStatisticalId(temp.getId()); @@ -123,17 +108,6 @@ public class MqttMessageHandler { } }); -// executorService.shutdown(); -// -// resultList.forEach(temp->{ -// try { -// tempList.addAll(temp.get()); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } catch (ExecutionException e) { -// throw new RuntimeException(e); -// } -// }); //过滤M相 List m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList()); m.stream().forEach(temp->{ @@ -144,34 +118,6 @@ public class MqttMessageHandler { result.add(thdDataVO); }); }); - //过滤谐波电流,谐波电压畸变率求平均值 -// List thdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdU(%)")).collect(Collectors.toList()); -// Map> collect = thdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); -// collect.forEach((k,v)->{ -// if(!CollectionUtil.isEmpty(v)){ -// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); -// ThdDataVO thdDataVO = new ThdDataVO(); -// BeanUtils.copyProperties(v.get(0),thdDataVO); -// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); -// thdDataVO.setPhase("avg"); -// result.add(thdDataVO); -// -// } -// }); - -// List thdV = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdI(%)")).collect(Collectors.toList()); -// Map> collect1 = thdV.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); -// collect1.forEach((k,v)->{ -// if(!CollectionUtil.isEmpty(v)){ -// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); -// ThdDataVO thdDataVO = new ThdDataVO(); -// BeanUtils.copyProperties(v.get(0),thdDataVO); -// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); -// thdDataVO.setPhase("avg"); -// result.add(thdDataVO); -// -// } -// }); List apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList()); Map> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); collect3.forEach((k,v)->{ @@ -182,7 +128,6 @@ public class MqttMessageHandler { thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); thdDataVO.setPhase("avg"); result.add(thdDataVO); - } }); List apfRmsI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_RmsI_TolOut(A)")).collect(Collectors.toList()); @@ -195,7 +140,6 @@ public class MqttMessageHandler { thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); thdDataVO.setPhase("avg"); result.add(thdDataVO); - } }); List apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList()); @@ -212,8 +156,6 @@ public class MqttMessageHandler { } }); Double capacity = devCapacityFeignClient.getDevCapacity(devId).getData(); - - apfRmsI.forEach(temp->{ ThdDataVO thdDataVO = new ThdDataVO(); BeanUtils.copyProperties(temp,thdDataVO); @@ -225,24 +167,15 @@ public class MqttMessageHandler { }else { double v = temp.getStatisticalData() / capacity; thdDataVO.setStatisticalData(Double.valueOf(df.format(v))); - } result.add(thdDataVO); }); - - List notM = tempList.stream().filter(temp -> !Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList()); - result.addAll(notM); - - - Gson gson = new Gson(); topoDataJson = gson.toJson(result); - redisUtil.saveByKeyWithExpire(devId, (Object) topoDataJson,Long.valueOf(10*60)); + redisUtil.saveByKeyWithExpire(devId, (Object) topoDataJson, 30L); } - - publisher.send("/zl/devData/"+devId,topoDataJson,1,false); }