diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java new file mode 100644 index 0000000..3b94f64 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java @@ -0,0 +1,67 @@ +package com.njcn.zlevent.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.db.bo.BaseEntity; +import java.io.Serializable; +import java.time.LocalDateTime; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +/** + *

+ * 事件解析日志表 + *

+ * + * @author xuyang + * @since 2023-09-18 + */ +@Data +@TableName("cs_event_logs") +public class CsEventLogs { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + private String id; + + /** + * 监测点id + */ + private String lineId; + + /** + * 设备id + */ + private String deviceId; + + /** + * 事件发生时间 + */ + private LocalDateTime startTime; + + /** + * 事件名称 + */ + private String tag; + + /** + * 事件解析状态 + */ + private Integer status; + + /** + * 解析时间 + */ + private LocalDateTime time; + + /** + * 备注 + */ + private String remark; + + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java new file mode 100644 index 0000000..407d06a --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.zlevent.pojo.po.CsEventLogs; + +/** + *

+ * 事件解析日志表 Mapper 接口 + *

+ * + * @author xuyang + * @since 2023-09-18 + */ +public interface CsEventLogsMapper extends BaseMapper { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java new file mode 100644 index 0000000..d490c16 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.zlevent.pojo.po.CsEventLogs; + +/** + *

+ * 事件解析日志表 服务类 + *

+ * + * @author xuyang + * @since 2023-09-18 + */ +public interface ICsEventLogsService extends IService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java new file mode 100644 index 0000000..c9e1b92 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java @@ -0,0 +1,20 @@ +package com.njcn.zlevent.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.zlevent.mapper.CsEventLogsMapper; +import com.njcn.zlevent.pojo.po.CsEventLogs; +import com.njcn.zlevent.service.ICsEventLogsService; +import org.springframework.stereotype.Service; + +/** + *

+ * 事件解析日志表 服务实现类 + *

+ * + * @author xuyang + * @since 2023-09-18 + */ +@Service +public class CsEventLogsServiceImpl extends ServiceImpl implements ICsEventLogsService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index a7d93bb..c96cda0 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -28,6 +28,8 @@ import com.njcn.user.api.UserFeignClient; import com.njcn.user.pojo.po.User; import com.njcn.user.pojo.po.app.AppInfoSet; import com.njcn.zlevent.pojo.dto.NoticeUserDto; +import com.njcn.zlevent.pojo.po.CsEventLogs; +import com.njcn.zlevent.service.ICsEventLogsService; import com.njcn.zlevent.service.ICsEventService; import com.njcn.zlevent.service.ICsEventUserService; import com.njcn.zlevent.service.IEventService; @@ -90,6 +92,8 @@ public class EventServiceImpl implements IEventService { private final EventLogsFeignClient eventLogsFeignClient; + private final ICsEventLogsService csEventLogsService; + @Override @Transactional(rollbackFor = Exception.class) public void analysis(AppEventMessage appEventMessage) { @@ -98,9 +102,8 @@ public class EventServiceImpl implements IEventService { List records = new ArrayList(); List list2 = new ArrayList<>(); //获取监测点id - String lineId = null; + String lineId = null,id = null,tag = null;; LocalDateTime eventTime = null; - String id = null; Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); //判断字典数据是否存在 if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){ @@ -116,78 +119,100 @@ public class EventServiceImpl implements IEventService { } //获取装置id String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId(); - //处理事件数据 - List dataArray = appEventMessage.getMsg().getDataArray(); - for (AppEventMessage.DataArray item : dataArray) { - id = IdUtil.fastSimpleUUID(); - //事件入库 - CsEventPO csEvent = new CsEventPO(); - csEvent.setId(id); - csEvent.setLineId(lineId); - csEvent.setDeviceId(deviceId); - eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); - csEvent.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); - csEvent.setTag(item.getName()); - csEvent.setType(0); - csEvent.setClDid(appEventMessage.getMsg().getClDid()); - //todo 默认等级先设为1.后期根据事件再做调整 - csEvent.setLevel(1); - list1.add(csEvent); - //参数入库 - Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); - if (!Objects.isNull(item.getParam())){ - String tableName = map.get(item.getName()); - List params = item.getParam(); - for (AppEventMessage.Param param : params) { - Map tags = new HashMap<>(); - tags.put(InfluxDBTableConstant.UUID,id); - Map fields = new HashMap<>(); - fields.put(param.getName(),param.getData()); - //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 - Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); - batchPoints.point(point); - records.add(batchPoints.lineProtocol()); + try { + //处理事件数据 + List dataArray = appEventMessage.getMsg().getDataArray(); + for (AppEventMessage.DataArray item : dataArray) { + id = IdUtil.fastSimpleUUID(); + //事件入库 + CsEventPO csEvent = new CsEventPO(); + csEvent.setId(id); + csEvent.setLineId(lineId); + csEvent.setDeviceId(deviceId); + eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); + csEvent.setStartTime(eventTime); + tag = item.getName(); + csEvent.setTag(tag); + csEvent.setType(0); + csEvent.setClDid(appEventMessage.getMsg().getClDid()); + //todo 默认等级先设为1.后期根据事件再做调整 + csEvent.setLevel(1); + list1.add(csEvent); + //参数入库 + Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); + if (!Objects.isNull(item.getParam())){ + String tableName = map.get(item.getName()); + List params = item.getParam(); + for (AppEventMessage.Param param : params) { + Map tags = new HashMap<>(); + tags.put(InfluxDBTableConstant.UUID,id); + Map fields = new HashMap<>(); + fields.put(param.getName(),param.getData()); + //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 + Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + records.add(batchPoints.lineProtocol()); + } } + //事件用户关系入库 + list2 = deviceUserList(deviceId,id); + //事件处理日志库 + CsEventLogs csEventLogs = new CsEventLogs(); + csEventLogs.setLineId(lineId); + csEventLogs.setDeviceId(deviceId); + csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); + csEventLogs.setTag(item.getName()); + csEventLogs.setStatus(1); + csEventLogs.setTime(LocalDateTime.now()); + csEventLogsService.save(csEventLogs); } - //事件用户关系入库 - list2 = deviceUserList(deviceId,id); - } - //cs_event入库 - if (CollectionUtil.isNotEmpty(list1)){ - csEventService.saveBatch(list1); - } - //cs_device_user入库 - if (CollectionUtil.isNotEmpty(list2)){ - csEventUserService.saveBatch(list2); - } - //evt_data入库 - if (CollectionUtil.isNotEmpty(records)) { - influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records); - } - //todo 根据不同事件需要做处理,目前先测试消息通知 - List userList = getEventUser(deviceId); - if (CollectionUtil.isNotEmpty(userList)){ - List csEventSendMsgList = new ArrayList<>(); - List devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList()); - String content = appEventMessage.getId() + "于" +eventTime+ "发生暂态事件"; - sendEventToUser(devCodeList,"暂态事件",content,1); - //记录推送日志 - for (User item : userList) { - CsEventSendMsg csEventSendMsg = new CsEventSendMsg(); - csEventSendMsg.setUserId(item.getId()); - csEventSendMsg.setEventId(id); - csEventSendMsg.setSendTime(LocalDateTime.now()); - if (Objects.isNull(item.getDevCode())){ - csEventSendMsg.setStatus(0); - csEventSendMsg.setRemark("用户设备识别码为空"); - } else { - csEventSendMsg.setDevCode(item.getDevCode()); - csEventSendMsg.setStatus(1); + //cs_event入库 + if (CollectionUtil.isNotEmpty(list1)){ + csEventService.saveBatch(list1); + } + //cs_device_user入库 + if (CollectionUtil.isNotEmpty(list2)){ + csEventUserService.saveBatch(list2); + } + //evt_data入库 + if (CollectionUtil.isNotEmpty(records)) { + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records); + } + //todo 根据不同事件需要做处理,目前先测试消息通知 + List userList = getEventUser(deviceId); + if (CollectionUtil.isNotEmpty(userList)){ + List csEventSendMsgList = new ArrayList<>(); + List devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList()); + String content = appEventMessage.getId() + "于" +eventTime+ "发生暂态事件"; + sendEventToUser(devCodeList,"暂态事件",content,1); + //记录推送日志 + for (User item : userList) { + CsEventSendMsg csEventSendMsg = new CsEventSendMsg(); + csEventSendMsg.setUserId(item.getId()); + csEventSendMsg.setEventId(id); + csEventSendMsg.setSendTime(LocalDateTime.now()); + if (Objects.isNull(item.getDevCode())){ + csEventSendMsg.setStatus(0); + csEventSendMsg.setRemark("用户设备识别码为空"); + } else { + csEventSendMsg.setDevCode(item.getDevCode()); + csEventSendMsg.setStatus(1); + } + csEventSendMsgList.add(csEventSendMsg); } - csEventSendMsgList.add(csEventSendMsg); + eventLogsFeignClient.addLogs(csEventSendMsgList); } - eventLogsFeignClient.addLogs(csEventSendMsgList); + } catch (Exception e) { + CsEventLogs csEventLogs = new CsEventLogs(); + csEventLogs.setLineId(lineId); + csEventLogs.setDeviceId(deviceId); + csEventLogs.setStartTime(eventTime); + csEventLogs.setTag(tag); + csEventLogs.setStatus(0); + csEventLogs.setTime(LocalDateTime.now()); + csEventLogs.setRemark(e.getMessage()); + csEventLogsService.save(csEventLogs); } }