新增事件解析日志记录

This commit is contained in:
2023-09-18 15:18:09 +08:00
parent 5192f809dd
commit 1bfa7309da
5 changed files with 214 additions and 70 deletions

View File

@@ -0,0 +1,67 @@
package com.njcn.zlevent.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.db.bo.BaseEntity;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 事件解析日志表
* </p>
*
* @author xuyang
* @since 2023-09-18
*/
@Data
@TableName("cs_event_logs")
public class CsEventLogs {
private static final long serialVersionUID = 1L;
/**
* id
*/
private String id;
/**
* 监测点id
*/
private String lineId;
/**
* 设备id
*/
private String deviceId;
/**
* 事件发生时间
*/
private LocalDateTime startTime;
/**
* 事件名称
*/
private String tag;
/**
* 事件解析状态
*/
private Integer status;
/**
* 解析时间
*/
private LocalDateTime time;
/**
* 备注
*/
private String remark;
}

View File

@@ -0,0 +1,16 @@
package com.njcn.zlevent.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.zlevent.pojo.po.CsEventLogs;
/**
* <p>
* 事件解析日志表 Mapper 接口
* </p>
*
* @author xuyang
* @since 2023-09-18
*/
public interface CsEventLogsMapper extends BaseMapper<CsEventLogs> {
}

View File

@@ -0,0 +1,16 @@
package com.njcn.zlevent.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.zlevent.pojo.po.CsEventLogs;
/**
* <p>
* 事件解析日志表 服务类
* </p>
*
* @author xuyang
* @since 2023-09-18
*/
public interface ICsEventLogsService extends IService<CsEventLogs> {
}

View File

@@ -0,0 +1,20 @@
package com.njcn.zlevent.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.zlevent.mapper.CsEventLogsMapper;
import com.njcn.zlevent.pojo.po.CsEventLogs;
import com.njcn.zlevent.service.ICsEventLogsService;
import org.springframework.stereotype.Service;
/**
* <p>
* 事件解析日志表 服务实现类
* </p>
*
* @author xuyang
* @since 2023-09-18
*/
@Service
public class CsEventLogsServiceImpl extends ServiceImpl<CsEventLogsMapper, CsEventLogs> implements ICsEventLogsService {
}

View File

@@ -28,6 +28,8 @@ import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import com.njcn.user.pojo.po.app.AppInfoSet;
import com.njcn.zlevent.pojo.dto.NoticeUserDto;
import com.njcn.zlevent.pojo.po.CsEventLogs;
import com.njcn.zlevent.service.ICsEventLogsService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.ICsEventUserService;
import com.njcn.zlevent.service.IEventService;
@@ -90,6 +92,8 @@ public class EventServiceImpl implements IEventService {
private final EventLogsFeignClient eventLogsFeignClient;
private final ICsEventLogsService csEventLogsService;
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppEventMessage appEventMessage) {
@@ -98,9 +102,8 @@ public class EventServiceImpl implements IEventService {
List<String> records = new ArrayList<String>();
List<CsEventUserPO> list2 = new ArrayList<>();
//获取监测点id
String lineId = null;
String lineId = null,id = null,tag = null;;
LocalDateTime eventTime = null;
String id = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
//判断字典数据是否存在
if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){
@@ -116,78 +119,100 @@ public class EventServiceImpl implements IEventService {
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
//处理事件数据
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) {
id = IdUtil.fastSimpleUUID();
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
csEvent.setLineId(lineId);
csEvent.setDeviceId(deviceId);
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEvent.setTag(item.getName());
csEvent.setType(0);
csEvent.setClDid(appEventMessage.getMsg().getClDid());
//todo 默认等级先设为1.后期根据事件再做调整
csEvent.setLevel(1);
list1.add(csEvent);
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
if (!Objects.isNull(item.getParam())){
String tableName = map.get(item.getName());
List<AppEventMessage.Param> params = item.getParam();
for (AppEventMessage.Param param : params) {
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
fields.put(param.getName(),param.getData());
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
try {
//处理事件数据
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) {
id = IdUtil.fastSimpleUUID();
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
csEvent.setLineId(lineId);
csEvent.setDeviceId(deviceId);
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(eventTime);
tag = item.getName();
csEvent.setTag(tag);
csEvent.setType(0);
csEvent.setClDid(appEventMessage.getMsg().getClDid());
//todo 默认等级先设为1.后期根据事件再做调整
csEvent.setLevel(1);
list1.add(csEvent);
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
if (!Objects.isNull(item.getParam())){
String tableName = map.get(item.getName());
List<AppEventMessage.Param> params = item.getParam();
for (AppEventMessage.Param param : params) {
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
fields.put(param.getName(),param.getData());
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
}
}
//事件用户关系入库
list2 = deviceUserList(deviceId,id);
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(deviceId);
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
csEventLogs.setTime(LocalDateTime.now());
csEventLogsService.save(csEventLogs);
}
//事件用户关系入库
list2 = deviceUserList(deviceId,id);
}
//cs_event入库
if (CollectionUtil.isNotEmpty(list1)){
csEventService.saveBatch(list1);
}
//cs_device_user入库
if (CollectionUtil.isNotEmpty(list2)){
csEventUserService.saveBatch(list2);
}
//evt_data入库
if (CollectionUtil.isNotEmpty(records)) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
//todo 根据不同事件需要做处理,目前先测试消息通知
List<User> userList = getEventUser(deviceId);
if (CollectionUtil.isNotEmpty(userList)){
List<CsEventSendMsg> csEventSendMsgList = new ArrayList<>();
List<String> devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList());
String content = appEventMessage.getId() + "" +eventTime+ "发生暂态事件";
sendEventToUser(devCodeList,"暂态事件",content,1);
//记录推送日志
for (User item : userList) {
CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
csEventSendMsg.setUserId(item.getId());
csEventSendMsg.setEventId(id);
csEventSendMsg.setSendTime(LocalDateTime.now());
if (Objects.isNull(item.getDevCode())){
csEventSendMsg.setStatus(0);
csEventSendMsg.setRemark("用户设备识别码为空");
} else {
csEventSendMsg.setDevCode(item.getDevCode());
csEventSendMsg.setStatus(1);
//cs_event入库
if (CollectionUtil.isNotEmpty(list1)){
csEventService.saveBatch(list1);
}
//cs_device_user入库
if (CollectionUtil.isNotEmpty(list2)){
csEventUserService.saveBatch(list2);
}
//evt_data入库
if (CollectionUtil.isNotEmpty(records)) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
//todo 根据不同事件需要做处理,目前先测试消息通知
List<User> userList = getEventUser(deviceId);
if (CollectionUtil.isNotEmpty(userList)){
List<CsEventSendMsg> csEventSendMsgList = new ArrayList<>();
List<String> devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList());
String content = appEventMessage.getId() + "" +eventTime+ "发生暂态事件";
sendEventToUser(devCodeList,"暂态事件",content,1);
//记录推送日志
for (User item : userList) {
CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
csEventSendMsg.setUserId(item.getId());
csEventSendMsg.setEventId(id);
csEventSendMsg.setSendTime(LocalDateTime.now());
if (Objects.isNull(item.getDevCode())){
csEventSendMsg.setStatus(0);
csEventSendMsg.setRemark("用户设备识别码为空");
} else {
csEventSendMsg.setDevCode(item.getDevCode());
csEventSendMsg.setStatus(1);
}
csEventSendMsgList.add(csEventSendMsg);
}
csEventSendMsgList.add(csEventSendMsg);
eventLogsFeignClient.addLogs(csEventSendMsgList);
}
eventLogsFeignClient.addLogs(csEventSendMsgList);
} catch (Exception e) {
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(deviceId);
csEventLogs.setStartTime(eventTime);
csEventLogs.setTag(tag);
csEventLogs.setStatus(0);
csEventLogs.setTime(LocalDateTime.now());
csEventLogs.setRemark(e.getMessage());
csEventLogsService.save(csEventLogs);
}
}