初始化rocketMQ-starter

This commit is contained in:
2023-08-07 10:48:32 +08:00
parent 14a567ad86
commit 8ae08bb3df
11 changed files with 583 additions and 12 deletions

View File

@@ -0,0 +1,38 @@
package com.njcn.middle.rocket.autoconfig;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.util.StringUtils;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 14:20
*/
public class EnvironmentIsolationConfig implements BeanPostProcessor {
private RocketEnhanceProperties rocketEnhanceProperties;
public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {
this.rocketEnhanceProperties = rocketEnhanceProperties;
}
/**
* 在装载Bean之前实现参数修改
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DefaultRocketMQListenerContainer){
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
}
return container;
}
return bean;
}
}

View File

@@ -0,0 +1,24 @@
package com.njcn.middle.rocket.autoconfig;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 14:21
*/
@ConfigurationProperties(prefix = "rocketmq.enhance")
@Data
public class RocketEnhanceProperties {
/***
* 是否隔离
*/
private boolean enabledIsolation;
/***
* 当前环境test、dev 或者 hainan beijing hebei
*/
private String environment;
}

View File

@@ -0,0 +1,64 @@
package com.njcn.middle.rocket.autoconfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import java.util.List;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 14:19
*/
@Configuration
@EnableConfigurationProperties(RocketEnhanceProperties.class)
public class RocketMQEnhanceAutoConfiguration {
/**
* 注入增强的RocketMQEnhanceTemplate
*/
@Bean
public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){
return new RocketMQEnhanceTemplate(rocketMQTemplate);
}
/**
* 解决RocketMQ Jackson不支持Java时间类型配置
*/
@Bean
@Primary
public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if(messageConverter instanceof MappingJackson2MessageConverter){
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}
/**
* 环境隔离配置
*/
@Bean
@ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true")
public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
return new EnvironmentIsolationConfig(rocketEnhanceProperties);
}
}

View File

@@ -0,0 +1,36 @@
package com.njcn.middle.rocket.constant;
import lombok.experimental.UtilityClass;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 14:20
*/
@UtilityClass
public class EnhanceMessageConstant {
public final int ONE_SECOND = 1;
public final int FIVE_SECOND = 2;
public final int TEN_SECOND = 3;
public final int THIRTY_SECOND = 4;
public final int ONE_MINUTE = 5;
public final int TWO_MINUTE = 6;
public final int THREE_MINUTE = 7;
public final int FOUR_MINUTE = 8;
public final int FIVE_MINUTE = 9;
public final int SIX_MINUTE = 10;
public final int SEVEN_MINUTE = 11;
public final int EIGHT_MINUTE = 12;
public final int NINE_MINUTE = 13;
public final int TEN_MINUTE = 14;
public final int TWENTY_MINUTE = 15;
public final int THIRTY_MINUTE = 16;
public final int ONE_HOUR = 17;
public final int TWO_HOUR = 18;
public final String RETRY_PREFIX = "RETRY_";
}

View File

@@ -0,0 +1,34 @@
package com.njcn.middle.rocket.domain;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 10:53
*/
@Data
public abstract class BaseMessage {
/**
* 业务键用于RocketMQ控制台查看消费情况
*/
protected String key;
/**
* 发送消息来源,用于排查问题
*/
protected String source = "";
/**
* 发送时间
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 重试次数,用于判断重试次数,超过重试次数发送异常警告
*/
protected Integer retryTimes = 0;
}

View File

@@ -0,0 +1,166 @@
package com.njcn.middle.rocket.handler;
import com.alibaba.fastjson.JSONObject;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import javax.annotation.Resource;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 13:44
* 抽象消息监听器,封装了所有公共处理业务,如
* 1、基础日志记录
* 2、异常处理
* 3、消息重试
* 4、警告通知
*/
@Slf4j
public abstract class EnhanceConsumerMessageHandler<T extends BaseMessage> {
/**
* 默认重试次数
*/
private static final int MAX_RETRY_TIMES = 3;
/**
* 延时等级
*/
private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;
@Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
/**
* 消息处理
*
* @param message 待处理消息
* @throws Exception 消费异常
*/
protected abstract void handleMessage(T message) throws Exception;
/**
* 超过重试次数消息需要启用isRetry
*
* @param message 待处理消息
*/
protected abstract void handleMaxRetriesExceeded(T message);
/**
* 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
*
* @param message 待处理消息
* @return true: 本次消息被过滤false不过滤
*/
protected boolean filter(T message) {
return false;
}
/**
* 是否异常时重复发送
*
* @return true: 消息重试false不重试
*/
protected abstract boolean isRetry();
/**
* 消费异常时是否抛出异常
* 返回true则由rocketmq机制自动重试
* false消费异常(如果没有开启重试则消息会被自动ack)
*/
protected abstract boolean throwException();
/**
* 最大重试次数
*
* @return 最大重试次数默认5次
*/
protected int getMaxRetryTimes() {
return MAX_RETRY_TIMES;
}
/**
* isRetry开启时重新入队延迟时间
*
* @return -1立即入队重试
*/
protected int getDelayLevel() {
return DELAY_LEVEL;
}
/**
* 使用模板模式构建消息消费框架,可自由扩展或删减
*/
public void dispatchMessage(T message) {
// 基础日志记录被父类处理了
log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
if (filter(message)) {
log.info("消息id{}不满足消费条件,已过滤。", message.getKey());
return;
}
// 超过最大重试次数时调用子类方法处理
if (message.getRetryTimes() > getMaxRetryTimes()) {
handleMaxRetriesExceeded(message);
return;
}
try {
long now = System.currentTimeMillis();
handleMessage(message);
long costTime = System.currentTimeMillis() - now;
log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
} catch (Exception e) {
log.error("消息{}消费异常", message.getKey(), e);
// 是捕获异常还是抛出,由子类决定
if (throwException()) {
//抛出异常由DefaultMessageListenerConcurrently类处理
throw new RuntimeException(e);
}
//此时如果不开启重试机制则默认ACK了
if (isRetry()) {
handleRetry(message);
}
}
}
protected void handleRetry(T message) {
// 获取子类RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation == null) {
return;
}
//重新构建消息体
String messageSource = message.getSource();
if (!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)) {
message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
}
message.setRetryTimes(message.getRetryTimes() + 1);
SendResult sendResult;
try {
// 如果消息发送不成功则再次重新发送如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
} catch (Exception ex) {
// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
//由生产者直接发送
throw new RuntimeException(ex);
}
// 发送失败的处理就是不进行ACK由RocketMQ重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重试消息发送失败");
}
}
}

View File

@@ -0,0 +1,90 @@
package com.njcn.middle.rocket.template;
import com.alibaba.fastjson.JSONObject;
import com.njcn.middle.rocket.autoconfig.RocketEnhanceProperties;
import com.njcn.middle.rocket.domain.BaseMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 11:06
*/
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RocketMQEnhanceTemplate {
private final RocketMQTemplate template;
@Resource
private RocketEnhanceProperties rocketEnhanceProperties;
public RocketMQTemplate getTemplate() {
return template;
}
/**
* 根据系统上下文自动构建隔离后的topic
* 构建目的地
*/
public String buildDestination(String topic, String tag) {
topic = reBuildTopic(topic);
return topic + ":" + tag;
}
/**
* 根据环境重新隔离topic
* @param topic 原始topic
*/
private String reBuildTopic(String topic) {
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
return topic +"_" + rocketEnhanceProperties.getEnvironment();
}
return topic;
}
/**
* 发送同步消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(buildDestination(topic,tag), message);
}
public <T extends BaseMessage> SendResult send(String destination, T message) {
// 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理...
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此处为了方便查看给日志转了json根据选择选择日志记录方式例如ELK采集
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}
/**
* 发送延迟消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(buildDestination(topic,tag), message, delayLevel);
}
public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}
}