模板化消费者监听代码块
This commit is contained in:
@@ -1,15 +1,23 @@
|
|||||||
package com.njcn.message.consumer;
|
package com.njcn.message.consumer;
|
||||||
|
|
||||||
|
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||||
import com.njcn.mq.constant.BusinessTopic;
|
import com.njcn.mq.constant.BusinessTopic;
|
||||||
|
import com.njcn.mq.constant.MessageStatus;
|
||||||
import com.njcn.mq.message.AppAutoDataMessage;
|
import com.njcn.mq.message.AppAutoDataMessage;
|
||||||
|
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
import com.njcn.stat.api.StatFeignClient;
|
import com.njcn.stat.api.StatFeignClient;
|
||||||
|
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||||
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.poi.ss.formula.functions.T;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 类的介绍:
|
* 类的介绍:
|
||||||
@@ -21,8 +29,8 @@ import javax.annotation.Resource;
|
|||||||
@Service
|
@Service
|
||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = BusinessTopic.NJCN_APP_AUTO_DATA_TOPIC,
|
topic = BusinessTopic.NJCN_APP_AUTO_DATA_TOPIC,
|
||||||
selectorExpression = BusinessTopic.AppDataTag.STAT_TAG,
|
|
||||||
consumerGroup = BusinessTopic.NJCN_APP_AUTO_DATA_TOPIC,
|
consumerGroup = BusinessTopic.NJCN_APP_AUTO_DATA_TOPIC,
|
||||||
|
selectorExpression = BusinessTopic.AppDataTag.STAT_TAG,
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
@@ -32,6 +40,12 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDa
|
|||||||
@Resource
|
@Resource
|
||||||
private StatFeignClient statFeignClient;
|
private StatFeignClient statFeignClient;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisUtil redisUtil;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void handleMessage(AppAutoDataMessage appAutoDataMessage) {
|
protected void handleMessage(AppAutoDataMessage appAutoDataMessage) {
|
||||||
Integer dataAttr = appAutoDataMessage.getMsg().getDataAttr();
|
Integer dataAttr = appAutoDataMessage.getMsg().getDataAttr();
|
||||||
@@ -48,21 +62,81 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void handleMaxRetriesExceeded(AppAutoDataMessage appAutoDataMessage) {
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 通过redis分布式锁判断当前消息所处状态
|
||||||
|
* 1、null 查不到该key的数据,属于第一次消费,放行
|
||||||
|
* 2、fail 上次消息消费时发生异常,放行
|
||||||
|
* 3、being processed 正在处理,打回去
|
||||||
|
* 4、success 最近72小时消费成功,避免重复消费,打回去
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean filter(AppAutoDataMessage message) {
|
||||||
|
String keyStatus = redisUtil.getStringByKey(message.getKey());
|
||||||
|
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||||
|
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.BEING_PROCESSED, 60L);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消费成功,缓存到redis72小时,避免重复消费
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void consumeSuccess(AppAutoDataMessage message) {
|
||||||
|
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发生异常时,进行错误信息入库保存
|
||||||
|
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void saveExceptionMsgLog(AppAutoDataMessage message, String identity, Exception exception) {
|
||||||
|
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
|
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||||
|
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||||
|
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||||
|
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
|
||||||
|
//数据库字段配置长度200,避免插入失败,大致分析异常原因
|
||||||
|
String exceptionMsg = exception.getMessage();
|
||||||
|
if(exceptionMsg.length() > 200){
|
||||||
|
exceptionMsg = exceptionMsg.substring(0,180);
|
||||||
|
}
|
||||||
|
rocketmqMsgErrorLog.setRecord(exceptionMsg);
|
||||||
|
//单次消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
} else {
|
||||||
|
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
|
||||||
|
//重试N次后,依然消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 处理失败后,是否重试
|
||||||
|
* 一般开启
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected boolean isRetry() {
|
protected boolean isRetry() {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 消费失败是否抛出异常,抛出异常后就不再消费了
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected boolean throwException() {
|
protected boolean throwException() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 调用父类handler处理消息的元信息
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(AppAutoDataMessage appAutoDataMessage) {
|
public void onMessage(AppAutoDataMessage appAutoDataMessage) {
|
||||||
super.dispatchMessage(appAutoDataMessage);
|
super.dispatchMessage(appAutoDataMessage);
|
||||||
|
|||||||
Reference in New Issue
Block a user