diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java index bfea268..7584a29 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java @@ -1,9 +1,16 @@ package com.njcn.zlevent.api; +import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; import com.njcn.mq.message.AppEventMessage; +import com.njcn.mq.message.CldLogMessage; import com.njcn.zlevent.api.fallback.EventClientFallbackFactory; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -20,4 +27,7 @@ public interface EventFeignClient { @PostMapping("/portableData") HttpResult getPortableData(@RequestBody AppEventMessage appEventMessage); + @PostMapping("/cldEventData") + HttpResult getCldEventData(@RequestBody CldLogMessage cldLogMessage); + } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java index d2555aa..4b8460e 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java @@ -4,6 +4,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppEventMessage; +import com.njcn.mq.message.CldLogMessage; import com.njcn.zlevent.api.EventFeignClient; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; @@ -37,6 +38,12 @@ public class EventClientFallbackFactory implements FallbackFactory getCldEventData(CldLogMessage cldLogMessage) { + log.error("{}异常,降级处理,异常为:{}","云前置事件处理",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java index 4a82214..fd58978 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java @@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; import com.njcn.mq.message.AppEventMessage; +import com.njcn.mq.message.CldLogMessage; import com.njcn.web.controller.BaseController; import com.njcn.zlevent.service.IEventService; import io.swagger.annotations.Api; @@ -54,4 +55,14 @@ public class EventController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/cldEventData") + @ApiOperation("云前置事件处理") + @ApiImplicitParam(name = "cldLogMessage", value = "数据实体", required = true) + public HttpResult getCldEventData(@RequestBody CldLogMessage cldLogMessage){ + String methodDescribe = getMethodDescribe("getCldEventData"); + eventService.getCldEventData(cldLogMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java index fdb93cc..666b5fc 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java @@ -1,6 +1,7 @@ package com.njcn.zlevent.service; import com.njcn.mq.message.AppEventMessage; +import com.njcn.mq.message.CldLogMessage; /** * @author xy @@ -25,4 +26,11 @@ public interface IEventService { */ void getPortableData(AppEventMessage appEventMessage); + /** + * 云前置设备基础数据 + * 1.装置发起数据记录开始动作,库中新增数据; + * @param cldLogMessage + */ + void getCldEventData( CldLogMessage cldLogMessage); + } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index 8572115..98ce4ed 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -17,6 +17,7 @@ import com.njcn.csharmonic.pojo.po.CsEventPO; import com.njcn.influx.pojo.constant.InfluxDBTableConstant; import com.njcn.influx.utils.InfluxDbUtils; import com.njcn.mq.message.AppEventMessage; +import com.njcn.mq.message.CldLogMessage; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.stat.enums.StatResponseEnum; @@ -258,6 +259,36 @@ public class EventServiceImpl implements IEventService { }); } + @Override + public void getCldEventData(CldLogMessage cldLogMessage) { + CsEventPO po = new CsEventPO(); + po.setStartTime(LocalDateTime.parse(cldLogMessage.getTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); + po.setTag(cldLogMessage.getLog()); + po.setClDid(1); + po.setLevel(3); + po.setProcess(4); + po.setCode(cldLogMessage.getCode()); + //前置告警 + if (Objects.equals(cldLogMessage.getLevel(),"process")) { + //这边将前置服务器id当作设备id + po.setDeviceId(cldLogMessage.getNodeId()); + po.setClDid(Integer.valueOf(cldLogMessage.getProcessNo())); + po.setType(4); + } + //设备和监测点告警 + else { + if (Objects.equals(cldLogMessage.getLevel(),"terminal")) { + po.setDeviceId(cldLogMessage.getBusinessId()); + } else { + CsLinePO line = csLineFeignClient.getById(cldLogMessage.getBusinessId()).getData(); + po.setDeviceId(line.getDeviceId()); + po.setLineId(cldLogMessage.getBusinessId()); + } + po.setType(3); + } + csEventService.save(po); + } + /** * 处理电压 * @param vol diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldEventConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldEventConsumer.java new file mode 100644 index 0000000..d7ded6d --- /dev/null +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldEventConsumer.java @@ -0,0 +1,134 @@ +package com.njcn.message.consumer; + +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.constant.MessageStatus; +import com.njcn.mq.message.CldLogMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import com.njcn.zlevent.api.EventFeignClient; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * 类的介绍: 云前置心跳消息 + * + * @author xuyang + * @version 1.0.0 + * @createTime 2025/9/16 + */ +@Service +@RocketMQMessageListener( + topic = BusinessTopic.LOG_TOPIC, + consumerGroup = BusinessTopic.LOG_TOPIC, + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class CldEventConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Resource + private EventFeignClient eventFeignClient; + + @Override + protected void handleMessage(CldLogMessage cldLogMessage) { + log.info("分发至云前置告警事件处理"); + eventFeignClient.getCldEventData(cldLogMessage); + } + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(CldLogMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(CldLogMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(CldLogMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(CldLogMessage cldLogMessage) { + super.dispatchMessage(cldLogMessage); + } +}