Compare commits
11 Commits
8ae08bb3df
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4cfed98139 | ||
|
|
c12d0b3107 | ||
| e073403f8a | |||
| 955a11858e | |||
| 717ab75837 | |||
| 9332b68f3f | |||
| f9a916dacb | |||
| 9cb42dd9d9 | |||
| a3283dd7b3 | |||
| 70130114de | |||
| fe9dc4df1d |
70
README.md
70
README.md
@@ -1,3 +1,69 @@
|
||||
# rocket-mq-springboot-starter
|
||||
#### 快速使用
|
||||
|
||||
RocketMQ官方文档中推荐的最佳实践存在部分问题:
|
||||
##### pom引入:
|
||||
|
||||
```
|
||||
<dependency>
|
||||
<groupId>com.njcn</groupId>
|
||||
<artifactId>rocket-mq-springboot-starter</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
##### 使用问题:
|
||||
|
||||
* WARN No appenders could be found for logger:启动项目时,会在日志中看到如下告警
|
||||
|
||||
```
|
||||
RocketMQLog:WARN No appenders could be found for logger(io.netty.util.internal.InternalThreadLocalMap).
|
||||
RocketMQLog:WARN Please initialize the logger system properly.
|
||||
```
|
||||
|
||||
此时我们只需要在启动类中设置环境变量 [rocketmq.client.logUseSlf4j](http://rocketmq.client.loguseslf4j/) 为 true 明确指定RocketMQ的日志框架
|
||||
|
||||
```java
|
||||
@SpringBootApplication
|
||||
public class RocketDemoApplication {
|
||||
public static void main(String[] args) {
|
||||
/*
|
||||
* 指定使用的日志框架,否则将会告警
|
||||
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
|
||||
* RocketMQLog:WARN Please initialize the logger system properly.
|
||||
*/
|
||||
System.setProperty("rocketmq.client.logUseSlf4j", "true");
|
||||
SpringApplication.run(RocketDemoApplication.class, args);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
同时还得在配置文件中调整日志级别,不然在控制台会一直看到broker的日志信息
|
||||
|
||||
```yml
|
||||
logging:
|
||||
level:
|
||||
RocketmqClient: ERROR
|
||||
io:
|
||||
netty: ERROR
|
||||
```
|
||||
|
||||
#### 1、RocketMQ官方sdk中存在的问题:
|
||||
|
||||
##### 1.1、不支持LocalDate 和 LocalDateTime
|
||||
|
||||
原因:RocketMQ内置使用的转换器是**RocketMQMessageConverter**,转换Json时使用的是MappingJackson2MessageConverter,但是这个转换器不支持时间类型。
|
||||
|
||||
解决办法:需要自定义消息转换器,将MappingJackson2MessageConverter进行替换,并添加支持时间模块。
|
||||
|
||||
##### 1.2、RockeMQ环境隔离
|
||||
|
||||
原因:在使用RocketMQ时,通常会在代码中直接指定消息主题(topic),而且开发环境和测试环境可能共用一个RocketMQ环境。如果没有进行处理,在开发环境发送的消息就可能被测试环境的消费者消费,测试环境发送的消息也可能被开发环境的消费者消费,从而导致数据混乱的问题。
|
||||
|
||||
解决方法:我们可以根据不同的环境实现自动隔离。通过简单配置一个选项,如dev、test、prod等不同环境,所有的消息都会被自动隔离。例如,当发送的消息主题为consumer_topic时,可以自动在topic后面加上环境后缀,如consumer_topic_dev。
|
||||
|
||||
详细查看EnvironmentIsolationConfig 配置。
|
||||
|
||||
##### 1.3、避免大量的重复代码
|
||||
|
||||
一个完整的消息传递链路:从生产者到消费者应包括 准备消息、发送消息、记录消息日志、发送失败处理、记录接受消息日志、处理业务逻辑、异常处理和异常重试等步骤。
|
||||
|
||||
虽然原生的rocketMQ可以完成这些动作,但是每个生产者和消费者都需要编写大量重复的代码来完成相同的任务。最终的目的是让开发人员只需要准备好消息实体并用封装后的工具类发送,而消费者只需处理核心业务逻辑,其他公共逻辑会得到统一处理。
|
||||
9
pom.xml
9
pom.xml
@@ -12,12 +12,12 @@
|
||||
<repository>
|
||||
<id>nexus-releases</id>
|
||||
<name>Nexus Release Repository</name>
|
||||
<url>http://192.168.1.13:8001/nexus/content/repositories/releases/</url>
|
||||
<url>http://192.168.1.22:8001/nexus/content/repositories/releases/</url>
|
||||
</repository>
|
||||
<snapshotRepository>
|
||||
<id>nexus-snapshots</id>
|
||||
<name>Nexus Snapshot Repository</name>
|
||||
<url>http://192.168.1.13:8001/nexus/content/repositories/snapshots/</url>
|
||||
<url>http://192.168.1.22:8001/nexus/content/repositories/snapshots/</url>
|
||||
</snapshotRepository>
|
||||
</distributionManagement>
|
||||
|
||||
@@ -43,6 +43,11 @@
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>2.2.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.njcn.platform</groupId>
|
||||
<artifactId>message-api</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<!--避免idea后端配置类报红-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
||||
@@ -27,10 +27,23 @@ public class EnvironmentIsolationConfig implements BeanPostProcessor {
|
||||
if(bean instanceof DefaultRocketMQListenerContainer){
|
||||
|
||||
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
|
||||
// 修改 ConsumerGroup(增加前缀,例如 dev_原始Group)
|
||||
|
||||
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
|
||||
container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
|
||||
|
||||
}
|
||||
//兼容jb阿里云top格式都是实例id%Topic,consumer格式GID_consumer
|
||||
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getTopicPrefix())){
|
||||
container.setTopic(String.join("%",rocketEnhanceProperties.getTopicPrefix(), container.getTopic()));
|
||||
|
||||
}
|
||||
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getConsumerGroupPrefix())){
|
||||
String originalGroup = container.getConsumerGroup();
|
||||
String isolatedGroup = String.join("_", rocketEnhanceProperties.getConsumerGroupPrefix(),originalGroup);
|
||||
container.setConsumerGroup(isolatedGroup);
|
||||
}
|
||||
|
||||
return container;
|
||||
}
|
||||
return bean;
|
||||
|
||||
@@ -21,4 +21,9 @@ public class RocketEnhanceProperties {
|
||||
* 当前环境,test、dev 或者 hainan beijing hebei
|
||||
*/
|
||||
private String environment;
|
||||
|
||||
private String consumerGroupPrefix;
|
||||
|
||||
private String topicPrefix;
|
||||
|
||||
}
|
||||
|
||||
@@ -61,4 +61,12 @@ public class RocketMQEnhanceAutoConfiguration {
|
||||
public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
|
||||
return new EnvironmentIsolationConfig(rocketEnhanceProperties);
|
||||
}
|
||||
|
||||
/***
|
||||
* rocketmq日志输出
|
||||
*/
|
||||
@Bean
|
||||
public RocketMQLogEnhance rocketMQLogEnhance(){
|
||||
return new RocketMQLogEnhance();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.njcn.middle.rocket.autoconfig;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
* @version 1.0.0
|
||||
* @date 2023年08月10日 10:38
|
||||
*/
|
||||
public class RocketMQLogEnhance implements InitializingBean {
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
System.setProperty("rocketmq.client.logUseSlf4j", "true");
|
||||
}
|
||||
}
|
||||
@@ -32,5 +32,11 @@ public class EnhanceMessageConstant {
|
||||
|
||||
public final String RETRY_PREFIX = "RETRY_";
|
||||
|
||||
//单次异常失败
|
||||
public final String IDENTITY_SINGLE = "IDENTITY_SINGLE";
|
||||
|
||||
//多次重试后,异常记录
|
||||
public final String IDENTITY_RETRY = "IDENTITY_RETRY";
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
package com.njcn.middle.rocket.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
* @version 1.0.0
|
||||
* @date 2023年08月04日 10:53
|
||||
*/
|
||||
@Data
|
||||
public abstract class BaseMessage {
|
||||
|
||||
/**
|
||||
* 业务键,用于RocketMQ控制台查看消费情况
|
||||
*/
|
||||
protected String key;
|
||||
|
||||
/**
|
||||
* 发送消息来源,用于排查问题
|
||||
*/
|
||||
protected String source = "";
|
||||
|
||||
/**
|
||||
* 发送时间
|
||||
*/
|
||||
protected LocalDateTime sendTime = LocalDateTime.now();
|
||||
|
||||
/**
|
||||
* 重试次数,用于判断重试次数,超过重试次数发送异常警告
|
||||
*/
|
||||
protected Integer retryTimes = 0;
|
||||
}
|
||||
@@ -52,8 +52,9 @@ public abstract class EnhanceConsumerMessageHandler<T extends BaseMessage> {
|
||||
*
|
||||
* @param message 待处理消息
|
||||
*/
|
||||
protected abstract void handleMaxRetriesExceeded(T message);
|
||||
|
||||
protected void handleMaxRetriesExceeded(T message) {
|
||||
saveExceptionMsgLog(message,EnhanceMessageConstant.IDENTITY_RETRY,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
|
||||
@@ -97,13 +98,27 @@ public abstract class EnhanceConsumerMessageHandler<T extends BaseMessage> {
|
||||
return DELAY_LEVEL;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 发生异常时,进行错误信息入库保存
|
||||
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||
*/
|
||||
protected void saveExceptionMsgLog(T message, String identity,Exception exception) {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 消费成功
|
||||
*/
|
||||
protected void consumeSuccess(T message) {}
|
||||
|
||||
|
||||
/**
|
||||
* 使用模板模式构建消息消费框架,可自由扩展或删减
|
||||
*/
|
||||
public void dispatchMessage(T message) {
|
||||
// 基础日志记录被父类处理了
|
||||
log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
|
||||
|
||||
if (filter(message)) {
|
||||
log.info("消息id{}不满足消费条件,已过滤。", message.getKey());
|
||||
return;
|
||||
@@ -116,10 +131,11 @@ public abstract class EnhanceConsumerMessageHandler<T extends BaseMessage> {
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
handleMessage(message);
|
||||
consumeSuccess(message);
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
|
||||
} catch (Exception e) {
|
||||
log.error("消息{}消费异常", message.getKey(), e);
|
||||
saveExceptionMsgLog(message,EnhanceMessageConstant.IDENTITY_SINGLE,e);
|
||||
// 是捕获异常还是抛出,由子类决定
|
||||
if (throwException()) {
|
||||
//抛出异常,由DefaultMessageListenerConcurrently类处理
|
||||
|
||||
@@ -47,10 +47,16 @@ public class RocketMQEnhanceTemplate {
|
||||
* @param topic 原始topic
|
||||
*/
|
||||
private String reBuildTopic(String topic) {
|
||||
String result =topic;
|
||||
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
|
||||
return topic +"_" + rocketEnhanceProperties.getEnvironment();
|
||||
result = result +"_" + rocketEnhanceProperties.getEnvironment();
|
||||
}
|
||||
return topic;
|
||||
//兼容jb阿里云top格式都是实例id%Topi
|
||||
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getTopicPrefix())){
|
||||
result = String.join("%",rocketEnhanceProperties.getTopicPrefix(), result);
|
||||
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -68,7 +74,7 @@ public class RocketMQEnhanceTemplate {
|
||||
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
|
||||
SendResult sendResult = template.syncSend(destination, sendMessage);
|
||||
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
|
||||
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
|
||||
//log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
|
||||
return sendResult;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user