事件
This commit is contained in:
@@ -27,6 +27,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -75,6 +76,10 @@ public class MqttMessageHandler {
|
||||
public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
|
||||
List<ThdDataVO> result = new ArrayList<>();
|
||||
List<ThdDataVO> tempList = new ArrayList<>();
|
||||
ExecutorService executorService = new ThreadPoolExecutor(40, 400,
|
||||
1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100, true),
|
||||
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
|
||||
List<Future<List<ThdDataVO>>> resultList = new ArrayList< Future<List<ThdDataVO>>>();
|
||||
|
||||
//1.查询拓扑图配置的指标:拓扑图扑图配置:7677f94c749dedaff30f911949cbd724
|
||||
List<EleEpdPqd> data = csStatisticalSetFeignClient.queryStatisticalSelect("7677f94c749dedaff30f911949cbd724").getData();
|
||||
@@ -86,21 +91,32 @@ public class MqttMessageHandler {
|
||||
commonStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||
commonStatisticalQueryParam.setValueType("avg");
|
||||
commonStatisticalQueryParam.setFrequency(i+"");
|
||||
List<ThdDataVO> thdDataVOS = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
|
||||
tempList.addAll(thdDataVOS);
|
||||
Future<List<ThdDataVO>> listFuture= executorService.submit(new TaskWithResult(commonStatisticalQueryParam));
|
||||
resultList.add(listFuture);
|
||||
}
|
||||
|
||||
|
||||
}else {
|
||||
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
|
||||
commonStatisticalQueryParam.setDevId(devId);
|
||||
commonStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||
commonStatisticalQueryParam.setValueType("avg");
|
||||
List<ThdDataVO> thdDataVOS = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
|
||||
tempList.addAll(thdDataVOS);
|
||||
}
|
||||
}else {
|
||||
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
|
||||
commonStatisticalQueryParam.setDevId(devId);
|
||||
commonStatisticalQueryParam.setStatisticalId(temp.getId());
|
||||
commonStatisticalQueryParam.setValueType("avg");
|
||||
Future<List<ThdDataVO>> listFuture= executorService.submit(new TaskWithResult(commonStatisticalQueryParam));
|
||||
resultList.add(listFuture);
|
||||
}
|
||||
|
||||
});
|
||||
executorService.shutdown();
|
||||
|
||||
resultList.forEach(temp->{
|
||||
try {
|
||||
tempList.addAll(temp.get());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
//过滤M相
|
||||
List<ThdDataVO> m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
|
||||
m.stream().forEach(temp->{
|
||||
@@ -198,4 +214,26 @@ public class MqttMessageHandler {
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
class TaskWithResult implements Callable<List<ThdDataVO>> {
|
||||
private CommonStatisticalQueryParam commonStatisticalQueryParam;
|
||||
|
||||
public TaskWithResult(CommonStatisticalQueryParam commonStatisticalQueryParam) {
|
||||
this.commonStatisticalQueryParam = commonStatisticalQueryParam;
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务的具体过程,一旦任务传给ExecutorService的submit方法,则该方法自动在一个线程上执行。
|
||||
*
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public List<ThdDataVO> call() throws Exception {
|
||||
List<ThdDataVO> thdDataVOList = new ArrayList<>();
|
||||
thdDataVOList = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
|
||||
|
||||
return thdDataVOList;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user