终端异常告警接口优化

This commit is contained in:
zhuxinyu
2023-03-29 16:06:48 +08:00
parent 0a5573804c
commit ff6f106047
9 changed files with 159 additions and 63 deletions

View File

@@ -73,34 +73,35 @@
<select id="getAlarmByDevice" resultType="com.njcn.prepare.harmonic.pojo.mysql.po.line.AlarmPO">
SELECT t.*,
(t.statisValue)/t.flowMeal flowProportion
FROM
(
SELECT device.Id AS Id,
device.id AS deviceId,
COUNT(ca.Id) AS alarmCount,
GROUP_CONCAT(ca.Remark) AS alarmDesc,
IFNULL(d.flow, (select flow from cld_flow_meal where type = 0 and flag = 1)) + ifnull(d1.flow, 0) flowMeal,
IFNULL(pmf.Actual_Value,0) statisValue
FROM pq_line line
INNER JOIN pq_line vol ON line.pid=vol.id
INNER JOIN pq_line device ON vol.pid=device.id
LEFT JOIN pq_device pd ON device.id=pd.id
LEFT JOIN pq_line_detail pld ON pld.id=line.id
LEFT JOIN cld_alarm ca ON ca.Line_Id=device.id
LEFT JOIN pqs_month_flow pmf ON pmf.Dev_Id=device.id
LEFT JOIN cld_dev_meal c ON device.id = c.line_id
LEFT JOIN cld_flow_meal d ON c.Base_Meal_Id = d.id
LEFT JOIN cld_flow_meal d1 ON c.Ream_Meal_Id = d1.id
WHERE
pd.Dev_Model = 1
AND
pd.Run_Flag = 0
AND
pld.Line_Grade is NOT NULL
AND
ca.Occurred_Time between #{startTime} and #{endTime}
GROUP BY deviceId
) t
FROM
(
SELECT device.Id AS Id,
device.id AS deviceId,
COUNT(ca.Id) AS alarmCount,
GROUP_CONCAT(ca.Remark) AS alarmDesc,
IFNULL(d.flow, (select flow from cld_flow_meal where type = 0 and flag = 1)) + ifnull(d1.flow, 0) flowMeal,
IFNULL(ANY_VALUE(pmf.Actual_Value),0) statisValue,
ANY_VALUE(device.Update_Time) AS updateTime
FROM pq_line line
INNER JOIN pq_line vol ON line.pid=vol.id
INNER JOIN pq_line device ON vol.pid=device.id
LEFT JOIN pq_device pd ON device.id=pd.id
LEFT JOIN pq_line_detail pld ON pld.id=line.id
LEFT JOIN cld_alarm ca ON ca.Line_Id=device.id
LEFT JOIN pqs_month_flow pmf ON pmf.Dev_Id=device.id
LEFT JOIN cld_dev_meal c ON device.id = c.line_id
LEFT JOIN cld_flow_meal d ON c.Base_Meal_Id = d.id
LEFT JOIN cld_flow_meal d1 ON c.Ream_Meal_Id = d1.id
WHERE
pd.Dev_Model = 1
AND
pd.Run_Flag = 0
AND
pld.Line_Grade is NOT NULL
AND
ca.Occurred_Time between #{startTime} and #{endTime}
GROUP BY deviceId
) t
ORDER BY flowProportion DESC
</select>

View File

@@ -2,23 +2,20 @@ package com.njcn.prepare.harmonic.service.mysql.Impl.device;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import com.njcn.device.pq.pojo.po.Communicate;
import com.njcn.influxdb.param.InfluxDBPublicParam;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.njcn.prepare.harmonic.mapper.mysql.device.DeviceAbnormaStatisticsMapper;
import com.njcn.prepare.harmonic.pojo.influxdb.po.Communicate;
import com.njcn.prepare.harmonic.pojo.mysql.po.line.AlarmPO;
import com.njcn.prepare.harmonic.pojo.mysql.po.line.AlarmStrategyVO;
import com.njcn.prepare.harmonic.pojo.mysql.po.line.LinePO;
import com.njcn.prepare.harmonic.pojo.mysql.po.line.TopMsgPO;
import com.njcn.prepare.harmonic.pojo.param.DeviceAbnormaStatisticsParam;
import com.njcn.prepare.harmonic.service.mysql.device.DeviceAbnormalStatisticsService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.util.*;
@@ -37,33 +34,31 @@ public class DeviceAbnormaStatisticsServiceImpl implements DeviceAbnormalStatist
private final InfluxDbUtils influxDbUtils;
@Override
public boolean dailyDeviceAbnormaStatistics(DeviceAbnormaStatisticsParam param) {
Map<String, List<TopMsgPO>> comMap = new HashMap<>();Map<String, List<AlarmPO>> devLineMap = new HashMap<>();
// 获取监测点告警
List<AlarmPO> lineAlarms = deviceAbnormaStatisticsMapper.getAlarmByLines(
DateUtil.beginOfDay(DateUtil.parse(param.getBeginTime())),
DateUtil.beginOfDay(DateUtil.parse(param.getEndTime())));
Map<String, List<AlarmPO>> lineMap = new HashMap<>();
if (CollectionUtil.isNotEmpty(lineAlarms)){
// 根据装置分组
lineMap = lineAlarms.stream().collect(Collectors.groupingBy(AlarmPO::getDeviceId));
devLineMap = lineAlarms.stream().collect(Collectors.groupingBy(AlarmPO::getDeviceId));
}
// 获取设备流量
List<AlarmPO> deviceAlarms = deviceAbnormaStatisticsMapper.getAlarmByDevice(
DateUtil.beginOfDay(DateUtil.parse(param.getBeginTime())),
DateUtil.beginOfDay(DateUtil.parse(param.getEndTime())));
List<String> devs = deviceAlarms.stream().map(AlarmPO::getId).collect(Collectors.toList());
devs.add("6469e77fda42db12c7ca6620a092f03c");
Map<String, List<Communicate>> comMap = new HashMap<>();
// 获取通信中断信息
List<Communicate> communicate = getCommunicate(devs, param.getBeginTime(), param.getEndTime());
if (CollectionUtil.isNotEmpty(communicate)){
comMap = communicate.stream().collect(Collectors.groupingBy(Communicate::getId));
List<TopMsgPO> comMsgs = getCommunicate(devs, param.getBeginTime(), param.getEndTime());
if (CollectionUtil.isNotEmpty(comMsgs)){
comMap = comMsgs.stream().collect(Collectors.groupingBy(TopMsgPO::getDevId));
}
// 整合监测点告警信息、数据完整性以及监测点等级为设备信息
for (AlarmPO deviceAlarm : deviceAlarms) {
if (!lineMap.containsKey(deviceAlarm.getId())) {
if (!devLineMap.containsKey(deviceAlarm.getId())) {
continue;
}
AlarmPO lineAlarm = lineMap.get(deviceAlarm.getId()).get(0);
AlarmPO lineAlarm = devLineMap.get(deviceAlarm.getId()).get(0);
deviceAlarm.setAlarmCount(deviceAlarm.getAlarmCount() + lineAlarm.getAlarmCount());
if (lineAlarm.getDue().compareTo(BigDecimal.ZERO) == 0) {
deviceAlarm.setIntegrity(BigDecimal.ZERO);
@@ -73,15 +68,14 @@ public class DeviceAbnormaStatisticsServiceImpl implements DeviceAbnormalStatist
if (lineAlarm.getLevel() != null) {
deviceAlarm.setLevel(lineAlarm.getLevel());
}
if (CollectionUtil.isNotEmpty(comMap)) {
List<Communicate> communicates = comMap.get(deviceAlarm.getId());
int comOut = communicates.stream().filter(t -> t.getType() == 0).collect(Collectors.toList()).size();
deviceAlarm.setComOutNum(comOut);
}
TopMsgPO topMsgPO = comMap.get(deviceAlarm.getId()).get(0);
deviceAlarm.setComOutNum(topMsgPO.getComOutCount());
deviceAlarm.setComOutDesc(String.join(",",topMsgPO.getComOutDesc()));
}
Map<Integer, List<AlarmPO>> levelMap = deviceAlarms.stream().collect(Collectors.groupingBy(AlarmPO::getLevel));
// 比对告警策略并落表
List<AlarmStrategyVO> alarmStrategyVOS = deviceAbnormaStatisticsMapper.selectAlarmStrategy();
List<TopMsgPO> alarmExceptions = new ArrayList<>();
Map<Integer, List<AlarmPO>> levelMap = deviceAlarms.stream().collect(Collectors.groupingBy(AlarmPO::getLevel));
for (AlarmStrategyVO strategyVO : alarmStrategyVOS) {
if (!levelMap.containsKey(strategyVO.getAlgoDesc())){
continue;
@@ -91,7 +85,7 @@ public class DeviceAbnormaStatisticsServiceImpl implements DeviceAbnormalStatist
TopMsgPO topMsg = new TopMsgPO();
topMsg.setDevId(alarmPO.getId());
topMsg.setFlowFlag(1);
topMsg.setComOutCount(alarmPO.getComOutNum() == 0 ? 0 : alarmPO.getComOutNum());
topMsg.setComOutCount(alarmPO.getComOutNum() == null ? 0 : alarmPO.getComOutNum());
if (alarmPO.getIntegrity().intValue()<strategyVO.getIntegrityValue()) {
topMsg.setIntegrityFlag("0");
topMsg.setIntegrityValue(alarmPO.getIntegrity().intValue());
@@ -111,7 +105,7 @@ public class DeviceAbnormaStatisticsServiceImpl implements DeviceAbnormalStatist
}
public List<Communicate> getCommunicate(List<String> devs, String startTime, String endTime) {
public List<TopMsgPO> getCommunicate(List<String> devs, String startTime, String endTime) {
//组装sql语句
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("time >= '").append(DateUtil.beginOfDay(DateUtil.parse(startTime))).append("' and ").append("time <= '").append(DateUtil.endOfDay(DateUtil.parse(endTime))).append("' and ");
@@ -126,7 +120,55 @@ public class DeviceAbnormaStatisticsServiceImpl implements DeviceAbnormalStatist
//获取暂降事件
QueryResult result = influxDbUtils.query(sql);
InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper();
List<Communicate> communicateList = influxDBResultMapper.toPOJO(result, Communicate.class);
return communicateList;
List<Communicate> communicates = influxDBResultMapper.toPOJO(result, Communicate.class);
List<TopMsgPO> comMsgs = new ArrayList<>();
if (CollectionUtil.isEmpty(communicates)) {
return comMsgs;
}
Map<String, List<Communicate>> comMap = communicates.stream().collect(Collectors.groupingBy(Communicate::getDevId));
for (Map.Entry<String, List<Communicate>> comEntry : comMap.entrySet()) {
TopMsgPO topMsgPO = new TopMsgPO(); List<String> comOutDesc = new ArrayList<>();
topMsgPO.setDevId(comEntry.getKey());
// 根据日期排序
List<Communicate> sortedList = comEntry.getValue().stream().sorted(Comparator.comparing(Communicate::getUpdateTime)).collect(Collectors.toList());
if (sortedList.size() == 1) {
if (sortedList.get(0).getType() == 0) {
comOutDesc.add(sortedList.get(0).getUpdateTime()+""+DateUtil.endOfDay(DateUtil.parse(endTime)));
} else {
comOutDesc.add((DateUtil.beginOfDay(DateUtil.parse(startTime)))+""+sortedList.get(0).getUpdateTime());
}
} else {
switch (sortedList.get(0).getType()) {
case 0:
for (int i = 0; i <sortedList.size() ; i++) {
if (sortedList.get(i).getType()==1) {
String comOutTime = sortedList.get(i-1).getUpdateTime() +""+ sortedList.get(i).getUpdateTime();
comOutDesc.add(comOutTime);
}
}
break;
case 1:
comOutDesc.add((DateUtil.beginOfDay(DateUtil.parse(startTime)))+""+sortedList.get(0).getUpdateTime());
for (int i = 0; i <sortedList.size();i++) {
if (sortedList.get(i).getType()==0) {
StringBuilder sb = new StringBuilder();
sb.append(sortedList.get(i).getUpdateTime());
sb.append("");
sb.append(i==sortedList.size()-1?DateUtil.endOfDay(DateUtil.parse(endTime)):sortedList.get(i+1).getUpdateTime());
comOutDesc.add(sb.toString());
}
}
break;
default:
break;
}
}
topMsgPO.setComOutDesc(comOutDesc);
// 通信中断次数
topMsgPO.setComOutCount(sortedList.stream().filter(a -> a.getType() == 0).collect(Collectors.toList()).size());
comMsgs.add(topMsgPO);
}
return comMsgs;
}
}