feat(alarm): 更新告警系统以支持在线率和完整性监控

- 移除原有的中断计数功能,改为在线率和完整性告警判断
- 新增CsAlarmData类用于存储在线率和完整性详细数据
- 更新AlarmVO数据结构,替换interruptCounts为onlineRateIsWarn和integrityIsWarn字段
- 修改告警服务实现,从JSON解析List<List<String>>改为解析CsAlarmData对象
- 新增channelRunDataAlarm方法用于运行数据告警算法,处理在线率和完整性阈值判断
- 实现完整的告警数据构建逻辑,包括设备在线率计算和监测点完整性评估
- 更新统计服务中的数据去重逻辑,避免重复计算设备和监测点数据
This commit is contained in:
xy
2026-05-14 09:22:26 +08:00
parent 82e5d6c8e2
commit aa36c077f2
8 changed files with 348 additions and 55 deletions

View File

@@ -46,6 +46,15 @@ public class DataTaskController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/runDataAlarm")
@ApiOperation("运行告警(完整性、在线率)")
public HttpResult<Boolean> channelRunDataAlarm(@RequestParam("time") String time){
String methodDescribe = getMethodDescribe("channelRunDataAlarm");
dataTaskService.channelRunDataAlarm(time);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/pushAppMsg")
@ApiOperation("app消息推送")

View File

@@ -23,6 +23,18 @@ public interface IDataTaskService {
*/
void channelRunAlarm(String time);
/**
运行告警算法
1、每天生成一条记录查询所有的设备获取设备告警数据、设备的在线率和完整性在线率根据设备来判断完整性根据监测点来判断(多个监测点取最小值)
告警判断逻辑:(这个数据是根据一个表配置的,先查询阈值配置表)
a.完整性 < 90(可变动)
b.完整性 > 90(可变动) && 在线率 < 60(可变动)
2、根据设备获取哪些用户有这个设备的权限然后将用户id收集起来。(主用户、子用户、管理员id集合)
3、将cs_alarm生成的记录存储到cs_event_user表中标记为未读
* @param time
*/
void channelRunDataAlarm(String time);
/**
* 推送App消息
* 只推送稳态事件、运行告警数据

View File

@@ -8,18 +8,23 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.csdevice.api.CsCommTerminalFeignClient;
import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.param.IcdBzParam;
import com.njcn.csdevice.pojo.dto.CsLineDTO;
import com.njcn.csdevice.pojo.param.UserDevParam;
import com.njcn.csdevice.pojo.po.CsDeviceUserPO;
import com.njcn.csdevice.pojo.po.RStatIntegrityD;
import com.njcn.csdevice.pojo.po.RStatOnlineRateD;
import com.njcn.csharmonic.api.*;
import com.njcn.csharmonic.param.CsEventUserQueryParam;
import com.njcn.csharmonic.pojo.param.RStatLimitQueryParam;
import com.njcn.csharmonic.pojo.po.*;
import com.njcn.cssystem.pojo.po.CsAlarmSet;
import com.njcn.cssystem.service.ICsAlarmSetService;
import com.njcn.cssystem.service.IDataTaskService;
import com.njcn.influx.imapper.PqsCommunicateMapper;
import com.njcn.influx.pojo.po.PqsCommunicate;
@@ -77,6 +82,9 @@ public class DataTaskServiceImpl implements IDataTaskService {
private final CsCommTerminalFeignClient csCommTerminalFeignClient;
private final CsHarmonicPlanFeignClient csHarmonicPlanFeignClient;
private final CsHarmonicPlanLineFeignClient csHarmonicPlanLineFeignClient;
private final ICsAlarmSetService csAlarmSetService;
private final OnlineRateFeignClient onlineRateFeignClient;
private final IntegrityFeignClient integrityClient;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -302,6 +310,135 @@ public class DataTaskServiceImpl implements IDataTaskService {
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void channelRunDataAlarm(String time) {
CsAlarm alarm = new CsAlarm();
List<CsEventUserPO> list2 = new ArrayList<>();
//获取所有监测点
List<CsLineDTO> allLine = csLineFeignClient.getAllLineDetail().getData();
if (CollUtil.isNotEmpty(allLine)) {
//过滤物联、在线监测的点
DictTreeVO portable = dictTreeFeignClient.queryByCode(DicDataEnum.PORTABLE.getCode()).getData();
if (ObjectUtil.isNotNull(portable)) {
allLine = allLine.stream().filter(item -> !item.getDeviceType().equals(portable.getId())).collect(Collectors.toList());
}
//设备id集合
List<String> devList = allLine.stream().map(CsLineDTO::getDeviceId).distinct().collect(Collectors.toList());
//获取设备和用户关系
UserDevParam param = new UserDevParam();
param.setList(devList);
List<CsDeviceUserPO> userList = csDeviceUserFeignClient.getList(param).getData();
//根据设备id分组 组是subUserId的集合
Map<String, List<String>> userDevMap = userList.stream().collect(Collectors.groupingBy(
CsDeviceUserPO::getDeviceId,
Collectors.mapping(
CsDeviceUserPO::getSubUserId,
Collectors.toList()
)
));
//获取管理员信息
List<User> adminList = appUserFeignClient.getAdminInfo().getData();
//获取用户id集合
List<String> userIdList = adminList.stream().map(User::getId).collect(Collectors.toList());
//获取时间
String date;
if (StringUtils.isNotBlank(time)) {
date = time;
} else {
date = DateUtil.yesterday().toString(NORM_DATE_PATTERN);
}
//获取在线率、完整性阈值
CsAlarmSet csAlarmSet = csAlarmSetService.getEnabledConfig();
//获取当日的在线率数据
IcdBzParam param1 = new IcdBzParam();
param1.setStartTime(date);
param1.setEndTime(date);
param1.setLineList(devList);
List<RStatOnlineRateD> onlineRateList = onlineRateFeignClient.list(param1).getData();
//获取当日的完整性数据
List<String> lineList = allLine.stream().map(CsLineDTO::getLineId).distinct().collect(Collectors.toList());
param1.setLineList(lineList);
List<RStatIntegrityD> integrityList = integrityClient.list(param1).getData();
// 构建告警数据
List<CsAlarmData> alarmDataList = buildAlarmData(devList, onlineRateList, integrityList, allLine, csAlarmSet);
Map<String,CsAlarmData> alarmDataMap = alarmDataList.stream().collect(Collectors.toMap(CsAlarmData::getDeviceId, item -> item));
//获取告警数据
CsEventUserQueryParam queryParam = new CsEventUserQueryParam();
queryParam.setTarget(devList);
queryParam.setStartTime(date + " 00:00:00");
queryParam.setEndTime(date + " 23:59:59");
List<CsEventPO> alarmList = eventFeignClient.getDevAlarmList(queryParam).getData();
Map<String, List<CsEventPO>> alarmMap = alarmList.stream().collect(Collectors.groupingBy(CsEventPO::getDeviceId));
List<String> l0 = new ArrayList<>();
List<CsAlarmData> l1 = new ArrayList<>();
List<List<String>> l2 = new ArrayList<>();
//step1:构造告警数据
String id = IdUtil.fastSimpleUUID();
alarm.setId(id);
alarm.setTime(LocalDate.parse(date));
devList.forEach(item->{
//写入中断数据
CsAlarmData data1 = alarmDataMap.get(item);
//写入告警数据
List<CsEventPO> data2 = alarmMap.get(item);
List<String> alarmIds = null;
if (CollUtil.isNotEmpty(data2)) {
alarmIds = data2.stream().map(CsEventPO::getId).collect(Collectors.toList());
}
//只要有一个不为空,就需要统计
if (Objects.nonNull(data1) || CollUtil.isNotEmpty(alarmIds)) {
// 对中断时间数据进行过滤,只保留间隔超过 6 小时的时间段
l0.add(item);
l1.add(data1);
l2.add(alarmIds);
}
});
alarm.setDevList(String.join(",", l0));
alarm.setInterruptEvent(new Gson().toJson(l1));
alarm.setAlarmEvent(new Gson().toJson(l2));
//step2:用户告警通知
if (CollUtil.isNotEmpty(l0)) {
Set<String> userIds = new HashSet<>();
l0.forEach(item->{
List<String> ids = userDevMap.get(item);
if (CollUtil.isNotEmpty(ids)) {
userIds.addAll(ids);
}
});
//添加管理员用户
List<String> result = Stream.concat(userIdList.stream(), userIds.stream())
.distinct()
.collect(Collectors.toList());
result.forEach(userId->{
//新增cs_event_user数据
CsEventUserPO po = new CsEventUserPO();
po.setUserId(userId);
po.setEventId(id);
po.setStatus(0);
list2.add(po);
});
}
}
if (ObjectUtil.isNotNull(alarm)) {
//先删除 再录入
List<CsAlarm> oldList = csAlarmFeignClient.queryListByTime(time).getData();
if (CollUtil.isNotEmpty(oldList)) {
List<String> ids = oldList.stream().map(CsAlarm::getId).collect(Collectors.toList());
csAlarmFeignClient.deleteListByTime(time);
eventUserFeignClient.deleteByIds(ids);
}
csAlarmFeignClient.addList(Collections.singletonList(alarm));
eventUserFeignClient.addUserEventList(list2);
}
}
@Override
public void pushAppMsg(String time) {
//查询所有用户-设备关系
@@ -457,6 +594,108 @@ public class DataTaskServiceImpl implements IDataTaskService {
}
}
/**
* 构建告警数据
*/
private List<CsAlarmData> buildAlarmData(List<String> devList,
List<RStatOnlineRateD> onlineRateList,
List<RStatIntegrityD> integrityList,
List<CsLineDTO> allLine,
CsAlarmSet csAlarmSet) {
List<CsAlarmData> result = new ArrayList<>();
if (CollUtil.isEmpty(devList)) {
return result;
}
// 获取在线率阈值和完整性阈值
double onlineRateThreshold = csAlarmSet.getOnlineRateLimit() != null ? csAlarmSet.getOnlineRateLimit().doubleValue() : 60.0;
double integrityThreshold = csAlarmSet.getIntegrityLimit() != null ? csAlarmSet.getIntegrityLimit().doubleValue() : 90.0;
// 构建设备ID到监测点列表的映射
Map<String, List<CsLineDTO>> deviceToLinesMap = allLine.stream()
.collect(Collectors.groupingBy(CsLineDTO::getDeviceId));
// 构建监测点ID到监测点信息的映射
Map<String, CsLineDTO> lineInfoMap = allLine.stream()
.collect(Collectors.toMap(CsLineDTO::getLineId, line -> line, (k1, k2) -> k1));
// 将列表转换为Map便于查找处理null情况
Map<String, RStatOnlineRateD> onlineRateMap = CollUtil.isNotEmpty(onlineRateList)
? onlineRateList.stream().collect(Collectors.toMap(RStatOnlineRateD::getDevIndex, item -> item, (k1, k2) -> k1))
: new HashMap<>();
Map<String, RStatIntegrityD> integrityDataMap = CollUtil.isNotEmpty(integrityList)
? integrityList.stream().collect(Collectors.toMap(RStatIntegrityD::getLineIndex, item -> item, (k1, k2) -> k1))
: new HashMap<>();
// 遍历每个设备
for (String deviceId : devList) {
// 处理在线率数据
RStatOnlineRateD onlineRateData = onlineRateMap.get(deviceId);
// 如果为空默认值为0
int onlineMin = onlineRateData != null ? onlineRateData.getOnlineMin() : 0;
double onlineRateValue = Math.round(onlineMin * 100.0 / 1440 * 100.0) / 100.0;
CsAlarmData.OnlineRateAlarm onlineRateAlarm = new CsAlarmData.OnlineRateAlarm();
onlineRateAlarm.setValue(onlineRateValue);
onlineRateAlarm.setThreshold(onlineRateThreshold);
// value < threshold 时为 true低于阈值为异常
onlineRateAlarm.setIsAbnormal(onlineRateValue < onlineRateThreshold);
// 处理完整性数据 - 按设备分组
List<CsLineDTO> deviceLines = deviceToLinesMap.get(deviceId);
boolean hasIntegrityAbnormal = false;
// 构建完整性数据结构
CsAlarmData.IntegrityAlarm integrityAlarm = new CsAlarmData.IntegrityAlarm();
integrityAlarm.setThreshold(integrityThreshold);
List<CsAlarmData.IntegrityAlarm.MonitorPointIntegrity> monitorPoints = new ArrayList<>();
if (CollUtil.isNotEmpty(deviceLines)) {
// 为该设备下的每个监测点创建完整性数据
for (CsLineDTO line : deviceLines) {
String lineId = line.getLineId();
RStatIntegrityD integrityData = integrityDataMap.get(lineId);
// 如果为空默认值为0
int dueTime = integrityData != null && integrityData.getDueTime() != null ? integrityData.getDueTime() : 0;
int realTime = integrityData != null && integrityData.getRealTime() != null ? integrityData.getRealTime() : 0;
double integrityValue = dueTime > 0 ? Math.round(realTime * 100.0 / dueTime * 100.0) / 100.0 : 0.0;
CsAlarmData.IntegrityAlarm.MonitorPointIntegrity monitorPoint = new CsAlarmData.IntegrityAlarm.MonitorPointIntegrity();
monitorPoint.setMonitorPointId(lineId);
monitorPoint.setMonitorName(line.getName());
monitorPoint.setValue(integrityValue);
// value < threshold 时为 true低于阈值为异常
Boolean isAbnormal = integrityValue < integrityThreshold;
monitorPoint.setIsAbnormal(isAbnormal);
if (isAbnormal) {
hasIntegrityAbnormal = true;
}
monitorPoints.add(monitorPoint);
}
}
integrityAlarm.setMonitorPoints(monitorPoints);
// 只要在线率异常 或 完整性有任意一个监测点异常,就记录该设备的完整数据
if (onlineRateAlarm.getIsAbnormal() || hasIntegrityAbnormal) {
CsAlarmData alarmData = new CsAlarmData();
alarmData.setDeviceId(deviceId);
alarmData.setOnlineRate(onlineRateAlarm);
alarmData.setIntegrity(integrityAlarm);
result.add(alarmData);
}
}
return result;
}
/**
* 过滤时间间隔,只保留超过指定小时数的时间段