From 1bfa7309da5d1b257c5531bbbcc7a2f324cc2402 Mon Sep 17 00:00:00 2001
From: xuyang <748613696@qq.com>
Date: Mon, 18 Sep 2023 15:18:09 +0800
Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BA=8B=E4=BB=B6=E8=A7=A3?=
=?UTF-8?q?=E6=9E=90=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../com/njcn/zlevent/pojo/po/CsEventLogs.java | 67 +++++++
.../zlevent/mapper/CsEventLogsMapper.java | 16 ++
.../zlevent/service/ICsEventLogsService.java | 16 ++
.../service/impl/CsEventLogsServiceImpl.java | 20 +++
.../service/impl/EventServiceImpl.java | 165 ++++++++++--------
5 files changed, 214 insertions(+), 70 deletions(-)
create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java
create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java
create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java
create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java
diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java
new file mode 100644
index 0000000..3b94f64
--- /dev/null
+++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java
@@ -0,0 +1,67 @@
+package com.njcn.zlevent.pojo.po;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.njcn.db.bo.BaseEntity;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ *
+ * 事件解析日志表
+ *
+ *
+ * @author xuyang
+ * @since 2023-09-18
+ */
+@Data
+@TableName("cs_event_logs")
+public class CsEventLogs {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * id
+ */
+ private String id;
+
+ /**
+ * 监测点id
+ */
+ private String lineId;
+
+ /**
+ * 设备id
+ */
+ private String deviceId;
+
+ /**
+ * 事件发生时间
+ */
+ private LocalDateTime startTime;
+
+ /**
+ * 事件名称
+ */
+ private String tag;
+
+ /**
+ * 事件解析状态
+ */
+ private Integer status;
+
+ /**
+ * 解析时间
+ */
+ private LocalDateTime time;
+
+ /**
+ * 备注
+ */
+ private String remark;
+
+
+}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java
new file mode 100644
index 0000000..407d06a
--- /dev/null
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java
@@ -0,0 +1,16 @@
+package com.njcn.zlevent.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.njcn.zlevent.pojo.po.CsEventLogs;
+
+/**
+ *
+ * 事件解析日志表 Mapper 接口
+ *
+ *
+ * @author xuyang
+ * @since 2023-09-18
+ */
+public interface CsEventLogsMapper extends BaseMapper {
+
+}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java
new file mode 100644
index 0000000..d490c16
--- /dev/null
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java
@@ -0,0 +1,16 @@
+package com.njcn.zlevent.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.njcn.zlevent.pojo.po.CsEventLogs;
+
+/**
+ *
+ * 事件解析日志表 服务类
+ *
+ *
+ * @author xuyang
+ * @since 2023-09-18
+ */
+public interface ICsEventLogsService extends IService {
+
+}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java
new file mode 100644
index 0000000..c9e1b92
--- /dev/null
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java
@@ -0,0 +1,20 @@
+package com.njcn.zlevent.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.njcn.zlevent.mapper.CsEventLogsMapper;
+import com.njcn.zlevent.pojo.po.CsEventLogs;
+import com.njcn.zlevent.service.ICsEventLogsService;
+import org.springframework.stereotype.Service;
+
+/**
+ *
+ * 事件解析日志表 服务实现类
+ *
+ *
+ * @author xuyang
+ * @since 2023-09-18
+ */
+@Service
+public class CsEventLogsServiceImpl extends ServiceImpl implements ICsEventLogsService {
+
+}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
index a7d93bb..c96cda0 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
@@ -28,6 +28,8 @@ import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import com.njcn.user.pojo.po.app.AppInfoSet;
import com.njcn.zlevent.pojo.dto.NoticeUserDto;
+import com.njcn.zlevent.pojo.po.CsEventLogs;
+import com.njcn.zlevent.service.ICsEventLogsService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.ICsEventUserService;
import com.njcn.zlevent.service.IEventService;
@@ -90,6 +92,8 @@ public class EventServiceImpl implements IEventService {
private final EventLogsFeignClient eventLogsFeignClient;
+ private final ICsEventLogsService csEventLogsService;
+
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppEventMessage appEventMessage) {
@@ -98,9 +102,8 @@ public class EventServiceImpl implements IEventService {
List records = new ArrayList();
List list2 = new ArrayList<>();
//获取监测点id
- String lineId = null;
+ String lineId = null,id = null,tag = null;;
LocalDateTime eventTime = null;
- String id = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
//判断字典数据是否存在
if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){
@@ -116,78 +119,100 @@ public class EventServiceImpl implements IEventService {
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
- //处理事件数据
- List dataArray = appEventMessage.getMsg().getDataArray();
- for (AppEventMessage.DataArray item : dataArray) {
- id = IdUtil.fastSimpleUUID();
- //事件入库
- CsEventPO csEvent = new CsEventPO();
- csEvent.setId(id);
- csEvent.setLineId(lineId);
- csEvent.setDeviceId(deviceId);
- eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
- csEvent.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
- csEvent.setTag(item.getName());
- csEvent.setType(0);
- csEvent.setClDid(appEventMessage.getMsg().getClDid());
- //todo 默认等级先设为1.后期根据事件再做调整
- csEvent.setLevel(1);
- list1.add(csEvent);
- //参数入库
- Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
- if (!Objects.isNull(item.getParam())){
- String tableName = map.get(item.getName());
- List params = item.getParam();
- for (AppEventMessage.Param param : params) {
- Map tags = new HashMap<>();
- tags.put(InfluxDBTableConstant.UUID,id);
- Map fields = new HashMap<>();
- fields.put(param.getName(),param.getData());
- //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
- Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
- BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
- batchPoints.point(point);
- records.add(batchPoints.lineProtocol());
+ try {
+ //处理事件数据
+ List dataArray = appEventMessage.getMsg().getDataArray();
+ for (AppEventMessage.DataArray item : dataArray) {
+ id = IdUtil.fastSimpleUUID();
+ //事件入库
+ CsEventPO csEvent = new CsEventPO();
+ csEvent.setId(id);
+ csEvent.setLineId(lineId);
+ csEvent.setDeviceId(deviceId);
+ eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
+ csEvent.setStartTime(eventTime);
+ tag = item.getName();
+ csEvent.setTag(tag);
+ csEvent.setType(0);
+ csEvent.setClDid(appEventMessage.getMsg().getClDid());
+ //todo 默认等级先设为1.后期根据事件再做调整
+ csEvent.setLevel(1);
+ list1.add(csEvent);
+ //参数入库
+ Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
+ if (!Objects.isNull(item.getParam())){
+ String tableName = map.get(item.getName());
+ List params = item.getParam();
+ for (AppEventMessage.Param param : params) {
+ Map tags = new HashMap<>();
+ tags.put(InfluxDBTableConstant.UUID,id);
+ Map fields = new HashMap<>();
+ fields.put(param.getName(),param.getData());
+ //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
+ Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
+ BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
+ batchPoints.point(point);
+ records.add(batchPoints.lineProtocol());
+ }
}
+ //事件用户关系入库
+ list2 = deviceUserList(deviceId,id);
+ //事件处理日志库
+ CsEventLogs csEventLogs = new CsEventLogs();
+ csEventLogs.setLineId(lineId);
+ csEventLogs.setDeviceId(deviceId);
+ csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
+ csEventLogs.setTag(item.getName());
+ csEventLogs.setStatus(1);
+ csEventLogs.setTime(LocalDateTime.now());
+ csEventLogsService.save(csEventLogs);
}
- //事件用户关系入库
- list2 = deviceUserList(deviceId,id);
- }
- //cs_event入库
- if (CollectionUtil.isNotEmpty(list1)){
- csEventService.saveBatch(list1);
- }
- //cs_device_user入库
- if (CollectionUtil.isNotEmpty(list2)){
- csEventUserService.saveBatch(list2);
- }
- //evt_data入库
- if (CollectionUtil.isNotEmpty(records)) {
- influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
- }
- //todo 根据不同事件需要做处理,目前先测试消息通知
- List userList = getEventUser(deviceId);
- if (CollectionUtil.isNotEmpty(userList)){
- List csEventSendMsgList = new ArrayList<>();
- List devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList());
- String content = appEventMessage.getId() + "于" +eventTime+ "发生暂态事件";
- sendEventToUser(devCodeList,"暂态事件",content,1);
- //记录推送日志
- for (User item : userList) {
- CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
- csEventSendMsg.setUserId(item.getId());
- csEventSendMsg.setEventId(id);
- csEventSendMsg.setSendTime(LocalDateTime.now());
- if (Objects.isNull(item.getDevCode())){
- csEventSendMsg.setStatus(0);
- csEventSendMsg.setRemark("用户设备识别码为空");
- } else {
- csEventSendMsg.setDevCode(item.getDevCode());
- csEventSendMsg.setStatus(1);
+ //cs_event入库
+ if (CollectionUtil.isNotEmpty(list1)){
+ csEventService.saveBatch(list1);
+ }
+ //cs_device_user入库
+ if (CollectionUtil.isNotEmpty(list2)){
+ csEventUserService.saveBatch(list2);
+ }
+ //evt_data入库
+ if (CollectionUtil.isNotEmpty(records)) {
+ influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
+ }
+ //todo 根据不同事件需要做处理,目前先测试消息通知
+ List userList = getEventUser(deviceId);
+ if (CollectionUtil.isNotEmpty(userList)){
+ List csEventSendMsgList = new ArrayList<>();
+ List devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList());
+ String content = appEventMessage.getId() + "于" +eventTime+ "发生暂态事件";
+ sendEventToUser(devCodeList,"暂态事件",content,1);
+ //记录推送日志
+ for (User item : userList) {
+ CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
+ csEventSendMsg.setUserId(item.getId());
+ csEventSendMsg.setEventId(id);
+ csEventSendMsg.setSendTime(LocalDateTime.now());
+ if (Objects.isNull(item.getDevCode())){
+ csEventSendMsg.setStatus(0);
+ csEventSendMsg.setRemark("用户设备识别码为空");
+ } else {
+ csEventSendMsg.setDevCode(item.getDevCode());
+ csEventSendMsg.setStatus(1);
+ }
+ csEventSendMsgList.add(csEventSendMsg);
}
- csEventSendMsgList.add(csEventSendMsg);
+ eventLogsFeignClient.addLogs(csEventSendMsgList);
}
- eventLogsFeignClient.addLogs(csEventSendMsgList);
+ } catch (Exception e) {
+ CsEventLogs csEventLogs = new CsEventLogs();
+ csEventLogs.setLineId(lineId);
+ csEventLogs.setDeviceId(deviceId);
+ csEventLogs.setStartTime(eventTime);
+ csEventLogs.setTag(tag);
+ csEventLogs.setStatus(0);
+ csEventLogs.setTime(LocalDateTime.now());
+ csEventLogs.setRemark(e.getMessage());
+ csEventLogsService.save(csEventLogs);
}
}