Compare commits

...

11 Commits

Author SHA1 Message Date
hzj
4cfed98139 兼容jb阿里云top格式都是实例id%Topic,consumer格式GID_consumer统一修改 2026-05-11 09:03:18 +08:00
hzj
c12d0b3107 切换位置BaseMessage 2025-02-13 09:06:33 +08:00
xy
e073403f8a 屏蔽日志,数据量太大 2024-10-24 20:26:45 +08:00
955a11858e 添加异常日志记录方法 2023-08-18 13:41:06 +08:00
717ab75837 添加异常日志记录方法 2023-08-18 11:26:14 +08:00
9332b68f3f 微调 2023-08-17 14:17:10 +08:00
f9a916dacb 添加默认key 2023-08-17 09:43:35 +08:00
9cb42dd9d9 微调 2023-08-10 19:20:56 +08:00
a3283dd7b3 微调 2023-08-10 10:41:01 +08:00
70130114de 初始化rocketMQ-starter 2023-08-07 10:50:53 +08:00
fe9dc4df1d 初始化rocketMQ-starter 2023-08-07 10:49:45 +08:00
10 changed files with 152 additions and 45 deletions

View File

@@ -1,3 +1,69 @@
# rocket-mq-springboot-starter
#### 快速使用
RocketMQ官方文档中推荐的最佳实践存在部分问题
##### pom引入
```
<dependency>
<groupId>com.njcn</groupId>
<artifactId>rocket-mq-springboot-starter</artifactId>
<version>1.0.0</version>
</dependency>
```
##### 使用问题:
* WARN No appenders could be found for logger启动项目时会在日志中看到如下告警
```
RocketMQLog:WARN No appenders could be found for logger(io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
```
此时我们只需要在启动类中设置环境变量 [rocketmq.client.logUseSlf4j](http://rocketmq.client.loguseslf4j/) 为 true 明确指定RocketMQ的日志框架
```java
@SpringBootApplication
public class RocketDemoApplication {
public static void main(String[] args) {
/*
* 指定使用的日志框架,否则将会告警
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
* RocketMQLog:WARN Please initialize the logger system properly.
*/
System.setProperty("rocketmq.client.logUseSlf4j", "true");
SpringApplication.run(RocketDemoApplication.class, args);
}
}
```
同时还得在配置文件中调整日志级别不然在控制台会一直看到broker的日志信息
```yml
logging:
level:
RocketmqClient: ERROR
io:
netty: ERROR
```
#### 1、RocketMQ官方sdk中存在的问题
##### 1.1、不支持LocalDate 和 LocalDateTime
原因RocketMQ内置使用的转换器是**RocketMQMessageConverter**转换Json时使用的是MappingJackson2MessageConverter但是这个转换器不支持时间类型。
解决办法需要自定义消息转换器将MappingJackson2MessageConverter进行替换并添加支持时间模块。
##### 1.2、RockeMQ环境隔离
原因在使用RocketMQ时通常会在代码中直接指定消息主题(topic)而且开发环境和测试环境可能共用一个RocketMQ环境。如果没有进行处理在开发环境发送的消息就可能被测试环境的消费者消费测试环境发送的消息也可能被开发环境的消费者消费从而导致数据混乱的问题。
解决方法我们可以根据不同的环境实现自动隔离。通过简单配置一个选项如dev、test、prod等不同环境所有的消息都会被自动隔离。例如当发送的消息主题为consumer_topic时可以自动在topic后面加上环境后缀如consumer_topic_dev。
详细查看EnvironmentIsolationConfig 配置。
##### 1.3、避免大量的重复代码
一个完整的消息传递链路:从生产者到消费者应包括 准备消息、发送消息、记录消息日志、发送失败处理、记录接受消息日志、处理业务逻辑、异常处理和异常重试等步骤。
虽然原生的rocketMQ可以完成这些动作但是每个生产者和消费者都需要编写大量重复的代码来完成相同的任务。最终的目的是让开发人员只需要准备好消息实体并用封装后的工具类发送而消费者只需处理核心业务逻辑其他公共逻辑会得到统一处理。

View File

@@ -12,12 +12,12 @@
<repository>
<id>nexus-releases</id>
<name>Nexus Release Repository</name>
<url>http://192.168.1.13:8001/nexus/content/repositories/releases/</url>
<url>http://192.168.1.22:8001/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>nexus-snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>http://192.168.1.13:8001/nexus/content/repositories/snapshots/</url>
<url>http://192.168.1.22:8001/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
@@ -43,6 +43,11 @@
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.njcn.platform</groupId>
<artifactId>message-api</artifactId>
<version>1.0.0</version>
</dependency>
<!--避免idea后端配置类报红-->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -27,10 +27,23 @@ public class EnvironmentIsolationConfig implements BeanPostProcessor {
if(bean instanceof DefaultRocketMQListenerContainer){
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
// 修改 ConsumerGroup增加前缀例如 dev_原始Group
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
}
//兼容jb阿里云top格式都是实例id%Topicconsumer格式GID_consumer
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getTopicPrefix())){
container.setTopic(String.join("%",rocketEnhanceProperties.getTopicPrefix(), container.getTopic()));
}
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getConsumerGroupPrefix())){
String originalGroup = container.getConsumerGroup();
String isolatedGroup = String.join("_", rocketEnhanceProperties.getConsumerGroupPrefix(),originalGroup);
container.setConsumerGroup(isolatedGroup);
}
return container;
}
return bean;

View File

@@ -21,4 +21,9 @@ public class RocketEnhanceProperties {
* 当前环境test、dev 或者 hainan beijing hebei
*/
private String environment;
private String consumerGroupPrefix;
private String topicPrefix;
}

View File

@@ -61,4 +61,12 @@ public class RocketMQEnhanceAutoConfiguration {
public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
return new EnvironmentIsolationConfig(rocketEnhanceProperties);
}
/***
* rocketmq日志输出
*/
@Bean
public RocketMQLogEnhance rocketMQLogEnhance(){
return new RocketMQLogEnhance();
}
}

View File

@@ -0,0 +1,16 @@
package com.njcn.middle.rocket.autoconfig;
import org.springframework.beans.factory.InitializingBean;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月10日 10:38
*/
public class RocketMQLogEnhance implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
System.setProperty("rocketmq.client.logUseSlf4j", "true");
}
}

View File

@@ -32,5 +32,11 @@ public class EnhanceMessageConstant {
public final String RETRY_PREFIX = "RETRY_";
//单次异常失败
public final String IDENTITY_SINGLE = "IDENTITY_SINGLE";
//多次重试后,异常记录
public final String IDENTITY_RETRY = "IDENTITY_RETRY";
}

View File

@@ -1,34 +0,0 @@
package com.njcn.middle.rocket.domain;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 10:53
*/
@Data
public abstract class BaseMessage {
/**
* 业务键用于RocketMQ控制台查看消费情况
*/
protected String key;
/**
* 发送消息来源,用于排查问题
*/
protected String source = "";
/**
* 发送时间
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 重试次数,用于判断重试次数,超过重试次数发送异常警告
*/
protected Integer retryTimes = 0;
}

View File

@@ -52,8 +52,9 @@ public abstract class EnhanceConsumerMessageHandler<T extends BaseMessage> {
*
* @param message 待处理消息
*/
protected abstract void handleMaxRetriesExceeded(T message);
protected void handleMaxRetriesExceeded(T message) {
saveExceptionMsgLog(message,EnhanceMessageConstant.IDENTITY_RETRY,null);
}
/**
* 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
@@ -97,13 +98,27 @@ public abstract class EnhanceConsumerMessageHandler<T extends BaseMessage> {
return DELAY_LEVEL;
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
protected void saveExceptionMsgLog(T message, String identity,Exception exception) {
}
/**
* 消费成功
*/
protected void consumeSuccess(T message) {}
/**
* 使用模板模式构建消息消费框架,可自由扩展或删减
*/
public void dispatchMessage(T message) {
// 基础日志记录被父类处理了
log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
if (filter(message)) {
log.info("消息id{}不满足消费条件,已过滤。", message.getKey());
return;
@@ -116,10 +131,11 @@ public abstract class EnhanceConsumerMessageHandler<T extends BaseMessage> {
try {
long now = System.currentTimeMillis();
handleMessage(message);
consumeSuccess(message);
long costTime = System.currentTimeMillis() - now;
log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
} catch (Exception e) {
log.error("消息{}消费异常", message.getKey(), e);
saveExceptionMsgLog(message,EnhanceMessageConstant.IDENTITY_SINGLE,e);
// 是捕获异常还是抛出,由子类决定
if (throwException()) {
//抛出异常由DefaultMessageListenerConcurrently类处理

View File

@@ -47,10 +47,16 @@ public class RocketMQEnhanceTemplate {
* @param topic 原始topic
*/
private String reBuildTopic(String topic) {
String result =topic;
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
return topic +"_" + rocketEnhanceProperties.getEnvironment();
result = result +"_" + rocketEnhanceProperties.getEnvironment();
}
return topic;
//兼容jb阿里云top格式都是实例id%Topi
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getTopicPrefix())){
result = String.join("%",rocketEnhanceProperties.getTopicPrefix(), result);
}
return result;
}
/**
@@ -68,7 +74,7 @@ public class RocketMQEnhanceTemplate {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此处为了方便查看给日志转了json根据选择选择日志记录方式例如ELK采集
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
//log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}