diff --git a/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java b/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java index e9fb00a..3e5faca 100644 --- a/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java +++ b/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java @@ -32,5 +32,11 @@ public class EnhanceMessageConstant { public final String RETRY_PREFIX = "RETRY_"; + //单次异常失败 + public final String IDENTITY_SINGLE = "IDENTITY_SINGLE"; + + //多次重试后,异常记录 + public final String IDENTITY_RETRY = "IDENTITY_RETRY"; + } diff --git a/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java b/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java index 69bb006..b3bbc38 100644 --- a/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java +++ b/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java @@ -52,8 +52,9 @@ public abstract class EnhanceConsumerMessageHandler { * * @param message 待处理消息 */ - protected abstract void handleMaxRetriesExceeded(T message); - + protected void handleMaxRetriesExceeded(T message) { + saveExceptionMsgLog(message,EnhanceMessageConstant.IDENTITY_RETRY); + } /** * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理 @@ -97,6 +98,14 @@ public abstract class EnhanceConsumerMessageHandler { return DELAY_LEVEL; } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + protected void saveExceptionMsgLog(T message, String identity) { + } + /** * 使用模板模式构建消息消费框架,可自由扩展或删减 */ @@ -118,7 +127,7 @@ public abstract class EnhanceConsumerMessageHandler { long costTime = System.currentTimeMillis() - now; log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime); } catch (Exception e) { - log.error("消息{}消费异常", message.getKey(), e); + saveExceptionMsgLog(message,EnhanceMessageConstant.IDENTITY_SINGLE); // 是捕获异常还是抛出,由子类决定 if (throwException()) { //抛出异常,由DefaultMessageListenerConcurrently类处理