From 8ae08bb3df608537a53f5da494a83a191b2ab885 Mon Sep 17 00:00:00 2001
From: hongawen <83944980@qq.com>
Date: Mon, 7 Aug 2023 10:48:32 +0800
Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96rocketMQ-starter?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.gitignore | 29 ++-
README.md | 11 +-
pom.xml | 99 +++++++++++
.../EnvironmentIsolationConfig.java | 38 ++++
.../autoconfig/RocketEnhanceProperties.java | 24 +++
.../RocketMQEnhanceAutoConfiguration.java | 64 +++++++
.../constant/EnhanceMessageConstant.java | 36 ++++
.../middle/rocket/domain/BaseMessage.java | 34 ++++
.../EnhanceConsumerMessageHandler.java | 166 ++++++++++++++++++
.../template/RocketMQEnhanceTemplate.java | 90 ++++++++++
src/main/resources/META-INF/spring.factories | 4 +
11 files changed, 583 insertions(+), 12 deletions(-)
create mode 100644 pom.xml
create mode 100644 src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java
create mode 100644 src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java
create mode 100644 src/main/java/com/njcn/middle/rocket/autoconfig/RocketMQEnhanceAutoConfiguration.java
create mode 100644 src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java
create mode 100644 src/main/java/com/njcn/middle/rocket/domain/BaseMessage.java
create mode 100644 src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java
create mode 100644 src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java
create mode 100644 src/main/resources/META-INF/spring.factories
diff --git a/.gitignore b/.gitignore
index 9154f4c..4632c4b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,9 @@
-# ---> Java
# Compiled class file
*.class
+*.iml
+*.idea
+target/
+logs/
# Log file
*.log
@@ -14,13 +17,31 @@
# Package Files #
*.jar
*.war
-*.nar
*.ear
-*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
-replay_pid*
+*velocity.log*
+
+# Eclipse #
+.classpath
+.project
+.settings/
+
+.DS_Store
+
+_dockerCerts/
+
+.factorypath
+
+node_modules/
+package-lock.json
+yarn.lock
+
+rebel.xml
+
+!DmJdbcDriver18.jar
+!kingbase8-8.6.0.jar
\ No newline at end of file
diff --git a/README.md b/README.md
index 26bdd37..87dd2da 100644
--- a/README.md
+++ b/README.md
@@ -1,8 +1,3 @@
-# rocket-mq-springboot-starter
-
-针对官方的sdk的二次开发补充:
-1、不支持LocalDate 和 LocalDateTime;
-2、RockeMQ环境隔离问题;
-3、消息发送成功或者失败要打印消息日志,用于业务排查问题。
-4、如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
-等等问题
\ No newline at end of file
+# rocket-mq-springboot-starter
+
+RocketMQ官方文档中推荐的最佳实践存在部分问题:
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..d57e67d
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,99 @@
+
+
+ 4.0.0
+ com.njcn
+ rocket-mq-springboot-starter
+ 1.0.0
+ jar
+ rocket中间件二次开发sdk
+
+
+ nexus-releases
+ Nexus Release Repository
+ http://192.168.1.13:8001/nexus/content/repositories/releases/
+
+
+ nexus-snapshots
+ Nexus Snapshot Repository
+ http://192.168.1.13:8001/nexus/content/repositories/snapshots/
+
+
+
+
+ 1.8
+ 8
+ 8
+ 2.3.12.RELEASE
+ 2.0.0.RELEASE
+ 5.7.9
+ 1.18.18
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ ${springboot.version}
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ 2.2.3
+
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ true
+ ${springboot.version}
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ ${autoconfigure.version}
+
+
+
+ cn.hutool
+ hutool-all
+ ${hutool.version}
+
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 2.2.1
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+
+
+
diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java b/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java
new file mode 100644
index 0000000..184744e
--- /dev/null
+++ b/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java
@@ -0,0 +1,38 @@
+package com.njcn.middle.rocket.autoconfig;
+
+import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.util.StringUtils;
+
+/**
+ * @author hongawen
+ * @version 1.0.0
+ * @date 2023年08月04日 14:20
+ */
+public class EnvironmentIsolationConfig implements BeanPostProcessor {
+
+ private RocketEnhanceProperties rocketEnhanceProperties;
+
+ public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {
+ this.rocketEnhanceProperties = rocketEnhanceProperties;
+ }
+
+
+ /**
+ * 在装载Bean之前实现参数修改
+ */
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ if(bean instanceof DefaultRocketMQListenerContainer){
+
+ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
+
+ if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
+ container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
+ }
+ return container;
+ }
+ return bean;
+ }
+}
diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java
new file mode 100644
index 0000000..b773cb0
--- /dev/null
+++ b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java
@@ -0,0 +1,24 @@
+package com.njcn.middle.rocket.autoconfig;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * @author hongawen
+ * @version 1.0.0
+ * @date 2023年08月04日 14:21
+ */
+@ConfigurationProperties(prefix = "rocketmq.enhance")
+@Data
+public class RocketEnhanceProperties {
+
+ /***
+ * 是否隔离
+ */
+ private boolean enabledIsolation;
+
+ /***
+ * 当前环境,test、dev 或者 hainan beijing hebei
+ */
+ private String environment;
+}
diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/RocketMQEnhanceAutoConfiguration.java b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketMQEnhanceAutoConfiguration.java
new file mode 100644
index 0000000..79dcfba
--- /dev/null
+++ b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketMQEnhanceAutoConfiguration.java
@@ -0,0 +1,64 @@
+package com.njcn.middle.rocket.autoconfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+
+import java.util.List;
+
+/**
+ * @author hongawen
+ * @version 1.0.0
+ * @date 2023年08月04日 14:19
+ */
+@Configuration
+@EnableConfigurationProperties(RocketEnhanceProperties.class)
+public class RocketMQEnhanceAutoConfiguration {
+
+ /**
+ * 注入增强的RocketMQEnhanceTemplate
+ */
+ @Bean
+ public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){
+ return new RocketMQEnhanceTemplate(rocketMQTemplate);
+ }
+
+ /**
+ * 解决RocketMQ Jackson不支持Java时间类型配置
+ */
+ @Bean
+ @Primary
+ public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
+ RocketMQMessageConverter converter = new RocketMQMessageConverter();
+ CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
+ List messageConverterList = compositeMessageConverter.getConverters();
+ for (MessageConverter messageConverter : messageConverterList) {
+ if(messageConverter instanceof MappingJackson2MessageConverter){
+ MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
+ ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
+ objectMapper.registerModules(new JavaTimeModule());
+ }
+ }
+ return converter;
+ }
+
+
+ /**
+ * 环境隔离配置
+ */
+ @Bean
+ @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true")
+ public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
+ return new EnvironmentIsolationConfig(rocketEnhanceProperties);
+ }
+}
diff --git a/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java b/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java
new file mode 100644
index 0000000..e9fb00a
--- /dev/null
+++ b/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java
@@ -0,0 +1,36 @@
+package com.njcn.middle.rocket.constant;
+
+import lombok.experimental.UtilityClass;
+
+/**
+ * @author hongawen
+ * @version 1.0.0
+ * @date 2023年08月04日 14:20
+ */
+@UtilityClass
+public class EnhanceMessageConstant {
+
+ public final int ONE_SECOND = 1;
+ public final int FIVE_SECOND = 2;
+
+ public final int TEN_SECOND = 3;
+ public final int THIRTY_SECOND = 4;
+ public final int ONE_MINUTE = 5;
+ public final int TWO_MINUTE = 6;
+ public final int THREE_MINUTE = 7;
+ public final int FOUR_MINUTE = 8;
+ public final int FIVE_MINUTE = 9;
+ public final int SIX_MINUTE = 10;
+ public final int SEVEN_MINUTE = 11;
+ public final int EIGHT_MINUTE = 12;
+ public final int NINE_MINUTE = 13;
+ public final int TEN_MINUTE = 14;
+ public final int TWENTY_MINUTE = 15;
+ public final int THIRTY_MINUTE = 16;
+ public final int ONE_HOUR = 17;
+ public final int TWO_HOUR = 18;
+
+ public final String RETRY_PREFIX = "RETRY_";
+
+
+}
diff --git a/src/main/java/com/njcn/middle/rocket/domain/BaseMessage.java b/src/main/java/com/njcn/middle/rocket/domain/BaseMessage.java
new file mode 100644
index 0000000..8d533f9
--- /dev/null
+++ b/src/main/java/com/njcn/middle/rocket/domain/BaseMessage.java
@@ -0,0 +1,34 @@
+package com.njcn.middle.rocket.domain;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author hongawen
+ * @version 1.0.0
+ * @date 2023年08月04日 10:53
+ */
+@Data
+public abstract class BaseMessage {
+
+ /**
+ * 业务键,用于RocketMQ控制台查看消费情况
+ */
+ protected String key;
+
+ /**
+ * 发送消息来源,用于排查问题
+ */
+ protected String source = "";
+
+ /**
+ * 发送时间
+ */
+ protected LocalDateTime sendTime = LocalDateTime.now();
+
+ /**
+ * 重试次数,用于判断重试次数,超过重试次数发送异常警告
+ */
+ protected Integer retryTimes = 0;
+}
diff --git a/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java b/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java
new file mode 100644
index 0000000..162d8d7
--- /dev/null
+++ b/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java
@@ -0,0 +1,166 @@
+package com.njcn.middle.rocket.handler;
+
+import com.alibaba.fastjson.JSONObject;
+import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
+import com.njcn.middle.rocket.domain.BaseMessage;
+import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+
+import javax.annotation.Resource;
+
+/**
+ * @author hongawen
+ * @version 1.0.0
+ * @date 2023年08月04日 13:44
+ * 抽象消息监听器,封装了所有公共处理业务,如
+ * 1、基础日志记录
+ * 2、异常处理
+ * 3、消息重试
+ * 4、警告通知
+ */
+@Slf4j
+public abstract class EnhanceConsumerMessageHandler {
+
+
+ /**
+ * 默认重试次数
+ */
+ private static final int MAX_RETRY_TIMES = 3;
+
+ /**
+ * 延时等级
+ */
+ private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;
+
+
+ @Resource
+ private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
+
+ /**
+ * 消息处理
+ *
+ * @param message 待处理消息
+ * @throws Exception 消费异常
+ */
+ protected abstract void handleMessage(T message) throws Exception;
+
+ /**
+ * 超过重试次数消息,需要启用isRetry
+ *
+ * @param message 待处理消息
+ */
+ protected abstract void handleMaxRetriesExceeded(T message);
+
+
+ /**
+ * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
+ *
+ * @param message 待处理消息
+ * @return true: 本次消息被过滤,false:不过滤
+ */
+ protected boolean filter(T message) {
+ return false;
+ }
+
+ /**
+ * 是否异常时重复发送
+ *
+ * @return true: 消息重试,false:不重试
+ */
+ protected abstract boolean isRetry();
+
+ /**
+ * 消费异常时是否抛出异常
+ * 返回true,则由rocketmq机制自动重试
+ * false:消费异常(如果没有开启重试则消息会被自动ack)
+ */
+ protected abstract boolean throwException();
+
+ /**
+ * 最大重试次数
+ *
+ * @return 最大重试次数,默认5次
+ */
+ protected int getMaxRetryTimes() {
+ return MAX_RETRY_TIMES;
+ }
+
+ /**
+ * isRetry开启时,重新入队延迟时间
+ *
+ * @return -1:立即入队重试
+ */
+ protected int getDelayLevel() {
+ return DELAY_LEVEL;
+ }
+
+ /**
+ * 使用模板模式构建消息消费框架,可自由扩展或删减
+ */
+ public void dispatchMessage(T message) {
+ // 基础日志记录被父类处理了
+ log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
+
+ if (filter(message)) {
+ log.info("消息id{}不满足消费条件,已过滤。", message.getKey());
+ return;
+ }
+ // 超过最大重试次数时调用子类方法处理
+ if (message.getRetryTimes() > getMaxRetryTimes()) {
+ handleMaxRetriesExceeded(message);
+ return;
+ }
+ try {
+ long now = System.currentTimeMillis();
+ handleMessage(message);
+ long costTime = System.currentTimeMillis() - now;
+ log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
+ } catch (Exception e) {
+ log.error("消息{}消费异常", message.getKey(), e);
+ // 是捕获异常还是抛出,由子类决定
+ if (throwException()) {
+ //抛出异常,由DefaultMessageListenerConcurrently类处理
+ throw new RuntimeException(e);
+ }
+ //此时如果不开启重试机制,则默认ACK了
+ if (isRetry()) {
+ handleRetry(message);
+ }
+ }
+ }
+
+ protected void handleRetry(T message) {
+ // 获取子类RocketMQMessageListener注解拿到topic和tag
+ RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
+ if (annotation == null) {
+ return;
+ }
+ //重新构建消息体
+ String messageSource = message.getSource();
+ if (!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)) {
+ message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
+ }
+ message.setRetryTimes(message.getRetryTimes() + 1);
+
+ SendResult sendResult;
+
+ try {
+ // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
+ sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
+ } catch (Exception ex) {
+ // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
+ //由生产者直接发送
+ throw new RuntimeException(ex);
+ }
+ // 发送失败的处理就是不进行ACK,由RocketMQ重试
+ if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+ throw new RuntimeException("重试消息发送失败");
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java b/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java
new file mode 100644
index 0000000..f5f3033
--- /dev/null
+++ b/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java
@@ -0,0 +1,90 @@
+package com.njcn.middle.rocket.template;
+
+import com.alibaba.fastjson.JSONObject;
+import com.njcn.middle.rocket.autoconfig.RocketEnhanceProperties;
+import com.njcn.middle.rocket.domain.BaseMessage;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQHeaders;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+
+/**
+ * @author hongawen
+ * @version 1.0.0
+ * @date 2023年08月04日 11:06
+ */
+@Slf4j
+@RequiredArgsConstructor(onConstructor = @__(@Autowired))
+public class RocketMQEnhanceTemplate {
+
+ private final RocketMQTemplate template;
+
+ @Resource
+ private RocketEnhanceProperties rocketEnhanceProperties;
+
+ public RocketMQTemplate getTemplate() {
+ return template;
+ }
+
+ /**
+ * 根据系统上下文自动构建隔离后的topic
+ * 构建目的地
+ */
+ public String buildDestination(String topic, String tag) {
+ topic = reBuildTopic(topic);
+ return topic + ":" + tag;
+ }
+
+ /**
+ * 根据环境重新隔离topic
+ * @param topic 原始topic
+ */
+ private String reBuildTopic(String topic) {
+ if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
+ return topic +"_" + rocketEnhanceProperties.getEnvironment();
+ }
+ return topic;
+ }
+
+ /**
+ * 发送同步消息
+ */
+ public SendResult send(String topic, String tag, T message) {
+ // 注意分隔符
+ return send(buildDestination(topic,tag), message);
+ }
+
+
+ public SendResult send(String destination, T message) {
+ // 设置业务键,此处根据公共的参数进行处理
+ // 更多的其它基础业务处理...
+ Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
+ SendResult sendResult = template.syncSend(destination, sendMessage);
+ // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
+ log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
+ return sendResult;
+ }
+
+ /**
+ * 发送延迟消息
+ */
+ public SendResult send(String topic, String tag, T message, int delayLevel) {
+ return send(buildDestination(topic,tag), message, delayLevel);
+ }
+
+ public SendResult send(String destination, T message, int delayLevel) {
+ Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
+ SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
+ log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
+ return sendResult;
+ }
+
+
+}
diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..cf775c6
--- /dev/null
+++ b/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,4 @@
+## AutoConfiguration
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+ com.njcn.middle.rocket.autoconfig.RocketMQEnhanceAutoConfiguration
+