优化指标查询

This commit is contained in:
huangzj
2023-09-14 13:57:10 +08:00
parent 2cf664276a
commit 36c5de878a
5 changed files with 215 additions and 64 deletions

View File

@@ -7,6 +7,7 @@ import com.github.tocrhz.mqtt.annotation.NamedValue;
import com.github.tocrhz.mqtt.annotation.Payload;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.csharmonic.param.CommonStatisticalQueryParam;
import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam;
import com.njcn.csharmonic.pojo.vo.CsRtDataVO;
import com.njcn.csharmonic.pojo.vo.ThdDataVO;
import com.njcn.csharmonic.service.ILineTargetService;
@@ -71,57 +72,65 @@ public class MqttMessageHandler {
Gson gson = new Gson();
publisher.send("/zl/TemperData/"+devId,gson.toJson(statisticalDataDTOS),1,false);
}
/**
* 实时数据应答
/*
* 谐波数据
*/
// @MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1)
// public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
//
// }
/**
* 实时数据应答
*/
@MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1)
public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
String topoDataJson =redisUtil.getStringByKey (devId);
if(StringUtils.isEmpty(topoDataJson)){
List<ThdDataVO> result = new ArrayList<>();
List<ThdDataVO> tempList = new ArrayList<>();
ExecutorService executorService = new ThreadPoolExecutor(10, 100,
1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100, true),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
List<Future<List<ThdDataVO>>> resultList = new ArrayList< Future<List<ThdDataVO>>>();
// ExecutorService executorService = new ThreadPoolExecutor(10, Integer.MAX_VALUE,
// 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
// Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
// List<Future<List<ThdDataVO>>> resultList = new ArrayList< Future<List<ThdDataVO>>>();
//1.查询拓扑图配置的指标:拓扑图扑图配置7677f94c749dedaff30f911949cbd724
List<EleEpdPqd> data = csStatisticalSetFeignClient.queryStatisticalSelect("7677f94c749dedaff30f911949cbd724").getData();
data.forEach(temp->{
if(Objects.nonNull(temp.getHarmStart())&&Objects.nonNull(temp.getHarmEnd())){
for (int i = temp.getHarmStart(); i < temp.getHarmEnd()+1; i++) {
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
commonStatisticalQueryParam.setDevId(devId);
commonStatisticalQueryParam.setStatisticalId(temp.getId());
commonStatisticalQueryParam.setValueType("avg");
commonStatisticalQueryParam.setFrequency(i+"");
Future<List<ThdDataVO>> listFuture= executorService.submit(new TaskWithResult(commonStatisticalQueryParam));
resultList.add(listFuture);
}
FrequencyStatisticalQueryParam frequencyStatisticalQueryParam = new FrequencyStatisticalQueryParam();
frequencyStatisticalQueryParam.setDevId(devId);
frequencyStatisticalQueryParam.setStatisticalId(temp.getId());
frequencyStatisticalQueryParam.setValueType("avg");
frequencyStatisticalQueryParam.setFrequencyStart(temp.getHarmStart());
frequencyStatisticalQueryParam.setFrequencyEnd(temp.getHarmEnd());
List<ThdDataVO> thdDataVOList = stableDataService.QuerySqlData(frequencyStatisticalQueryParam);
tempList.addAll(thdDataVOList);
}else {
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
commonStatisticalQueryParam.setDevId(devId);
commonStatisticalQueryParam.setStatisticalId(temp.getId());
commonStatisticalQueryParam.setValueType("avg");
Future<List<ThdDataVO>> listFuture= executorService.submit(new TaskWithResult(commonStatisticalQueryParam));
resultList.add(listFuture);
List<ThdDataVO> listFuture= stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
tempList.addAll(listFuture);
}
});
executorService.shutdown();
resultList.forEach(temp->{
try {
tempList.addAll(temp.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
// executorService.shutdown();
//
// resultList.forEach(temp->{
// try {
// tempList.addAll(temp.get());
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } catch (ExecutionException e) {
// throw new RuntimeException(e);
// }
// });
//过滤M相
List<ThdDataVO> m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
m.stream().forEach(temp->{
@@ -147,19 +156,19 @@ public class MqttMessageHandler {
}
});
List<ThdDataVO> thdV = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdI(%)")).collect(Collectors.toList());
Map<String, List<ThdDataVO>> collect1 = thdV.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
collect1.forEach((k,v)->{
if(!CollectionUtil.isEmpty(v)){
double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
ThdDataVO thdDataVO = new ThdDataVO();
BeanUtils.copyProperties(v.get(0),thdDataVO);
thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
thdDataVO.setPhase("avg");
result.add(thdDataVO);
}
});
// List<ThdDataVO> thdV = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Pq_ThdI(%)")).collect(Collectors.toList());
// Map<String, List<ThdDataVO>> collect1 = thdV.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
// collect1.forEach((k,v)->{
// if(!CollectionUtil.isEmpty(v)){
// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
// ThdDataVO thdDataVO = new ThdDataVO();
// BeanUtils.copyProperties(v.get(0),thdDataVO);
// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
// thdDataVO.setPhase("avg");
// result.add(thdDataVO);
//
// }
// });
List<ThdDataVO> apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList());
Map<String, List<ThdDataVO>> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
collect3.forEach((k,v)->{
@@ -225,25 +234,24 @@ public class MqttMessageHandler {
return "";
}
class TaskWithResult implements Callable<List<ThdDataVO>> {
private CommonStatisticalQueryParam commonStatisticalQueryParam;
public TaskWithResult(CommonStatisticalQueryParam commonStatisticalQueryParam) {
this.commonStatisticalQueryParam = commonStatisticalQueryParam;
}
/**
* 任务的具体过程一旦任务传给ExecutorService的submit方法则该方法自动在一个线程上执行。
*
* @return
* @throws Exception
*/
@Override
public List<ThdDataVO> call() throws Exception {
List<ThdDataVO> thdDataVOList = new ArrayList<>();
thdDataVOList = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
return thdDataVOList;
}
}
// class TaskWithResult implements Callable<List<ThdDataVO>> {
// private CommonStatisticalQueryParam commonStatisticalQueryParam;
//
// public TaskWithResult(CommonStatisticalQueryParam commonStatisticalQueryParam) {
// this.commonStatisticalQueryParam = commonStatisticalQueryParam;
// }
//
// /**
// * 任务的具体过程一旦任务传给ExecutorService的submit方法则该方法自动在一个线程上执行。
// *
// * @return
// * @throws Exception
// */
// @Override
// public List<ThdDataVO> call() throws Exception {
// List<ThdDataVO> thdDataVOList = new ArrayList<>();
// thdDataVOList = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
// return thdDataVOList;
// }
// }
}