This commit is contained in:
2026-03-12 20:08:58 +08:00
parent 8cef3227f3
commit e352488298
38 changed files with 12 additions and 4173 deletions

View File

@@ -1,217 +0,0 @@
# MQ 改造方案(评审稿)
## 1. 背景与目标
你当前的业务策略是:
1. 单体部署优先使用 Redis 作为 MQ。
2. 微服务部署优先使用 RocketMQ 作为 MQ。
3. RabbitMQ、Kafka 暂时不作为主路径,仅保留包结构隔离,不参与核心链路。
本方案目标是:
1. 在不破坏现有系统的前提下,建立可切换的 MQ 入口。
2. 把“切换成本”收敛到配置层,而不是业务代码层。
3. 采用分阶段改造,先低风险,后统一抽象。
## 1.1 本期非目标(避免范围蔓延)
本期改造明确不包含:
1. 严格顺序语义保障(全链路有序消费)。
2. 统一死信队列DLQ治理体系。
3. 事务消息与最终一致性框架化封装。
4. 多机房容灾级别的 MQ 治理。
说明:
1. 本期先完成“可切换、可回滚、可观测”的基础能力。
2. 复杂语义能力后续在 RocketMQ 路线下单独立项。
## 2. 当前实现现状(简版)
当前 `rdms-spring-boot-starter-mq` 的真实情况:
1. Redis 能力最完整包含模板、监听器抽象、Stream 补偿与清理任务。
2. RabbitMQ 只有 `MessageConverter` 级别自动配置,未形成统一收发抽象。
当前 `rdms-system` 默认配置:
## 3. 改造原则
1. 先配置统一,再接口统一,最后再清理非主路径。
2. 任何阶段都必须可回滚,且回滚只改配置不改代码。
3. 保持 Rabbit/Kafka 包路径存在,避免一次性大删导致历史分支合并困难。
## 4. 目标架构(落地后)
统一引入配置:
1. `rdms.mq.type=redis|rocketmq`
统一行为:
1. 单体环境配 `redis`
2. 微服务环境配 `rocketmq`
建议方式:
## 4.1 配置矩阵(最小可运行)
### 单体Redis
```yaml
rdms:
mq:
type: redis
spring:
data:
redis:
host: 127.0.0.1
port: 6379
```
### 微服务RocketMQ
```yaml
rdms:
mq:
type: rocketmq
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: rdms-producer-group
```
备注:
1. 微服务切 Rocket 前,必须先补齐 topic、consumer-group 等业务配置。
2. 建议在 dev/local 保留 Redis 配置,便于快速回滚。
## 4.2 命名规范Topic / Channel / StreamKey
建议统一命名规则,避免后续混乱:
1. Redis Channel / StreamKey`{env}:{domain}:{event}`
2. Rocket Topic`{env}_{domain}_{event}`
3. ConsumerGroup`{app}-{domain}-cg`
示例:
1. `dev:system:notify`
2. `prod_system_notify`
3. `rdms-system-notify-cg`
## 5. 分阶段实施计划
## 阶段 A低风险建议先做
目标:只做配置层统一,不动大规模代码。
实施项:
1. 新增配置项 `rdms.mq.type`,默认值 `redis`
3. 本阶段不删除 Rabbit/Kafka 代码,不变更包结构。
收益:
1. 切换 Redis/Rocket 只改配置。
2. 不触碰核心业务逻辑,风险最低。
回滚:
## 阶段 B中风险接口统一
目标:抽象统一 MQ 发送接口,减少业务对具体中间件的耦合。
实施项:
1. 新增统一接口,例如 `UnifiedMqSender`
2. 提供 `RedisMqSender``RocketMqSender` 两个实现。
3.`@ConditionalOnProperty` 根据 `rdms.mq.type` 注入唯一实现。
5. 统一消息契约,至少包含标准消息头:
`msgId``bizKey``timestamp``producer``traceId``version`
收益:
1. 业务层不再直接依赖 `RedisMQTemplate``RocketMQTemplate`
2. 后续切换中间件时改动面可控。
3. 消息协议可演进,减少多团队并行开发冲突。
回滚:
1. 切回原 sender Bean 注入方案,保留统一接口代码但不启用。
## 阶段 C可选结构收敛
目标:让非主路径代码“隔离可见但不激活”。
实施项:
1. Rabbit/Kafka 相关自动配置入口默认关闭。
2. 保留目录与类,增加注释标识 `reserved``deprecated`
3. 文档明确“当前生产主路径仅 Redis/Rocket”。
收益:
1. 新成员不会误判项目“全量支持四种 MQ”。
2. 后续若要恢复 Rabbit/Kafka可低成本重新开启。
回滚:
1. 打开对应配置开关即可恢复。
## 6. 影响面评估
主要影响模块:
1. `rdms-framework/rdms-spring-boot-starter-mq`
3. `rdms-system/rdms-system-boot` 配置文件
主要风险点:
1. Redis 与 Rocket 的消费语义不完全一致,幂等要在业务侧兜底。
2. 广播模型下,多实例重复消费属于预期,业务处理要避免副作用。
3. 配置切换后,缺失 Rocket 连接配置会导致启动失败。
4. 现有代码中 Kafka 消费类注解存在明显可疑项,改造时应单独复核。
5. 缺少统一幂等策略时Rocket 与 Redis 切换后可能放大重复消费副作用。
6. 配置缺失时若做了自动降级,可能导致“误以为切到 Rocket实际走本地/Redis”。
## 6.1 幂等策略(建议作为阶段 B 配套)
最小策略建议:
1. 每条消息必须带 `msgId``bizKey`
2.`bizKey` 作为业务幂等键(例如订单号、任务号)。
3. 消费前先做幂等检查,消费成功后写入幂等记录。
4. 幂等记录建议放 Redis设置合理 TTL如 3~7 天,按业务回放窗口)。
说明:
1. `msgId` 解决“传输级去重定位”,`bizKey` 解决“业务级幂等”。
2. 幂等是切换 MQ 前置条件,不应后补。
## 6.2 故障切换策略(明确 fail-fast
建议默认策略:
1. 目标 MQ 不可用时 `fail-fast`,启动或发送直接失败并告警。
2. 禁止隐式降级到 local避免行为不透明。
可选策略:
1. 在开发环境允许显式降级(需开关控制,并打印告警日志)。
## 7. 验收清单(每阶段都要过)
1. 单体环境 `rdms.mq.type=redis` 可启动、可发送、可消费。
2. 微服务环境 `rdms.mq.type=rocketmq` 可启动、可发送、可消费。
4. 关键链路日志可定位发送端与消费端。
5. 切换只改配置,不改代码。
6. 回滚路径已验证。
7. 幂等校验通过(重复消息不会造成业务副作用)。
8. 关键指标可观测并有告警(发送失败、消费失败、积压、重试)。
## 7.1 观测与告警建议
建议统一接入以下指标:
1. `mq_send_total` / `mq_send_fail_total`
2. `mq_consume_total` / `mq_consume_fail_total`
3. `mq_backlog_size`
4. `mq_retry_total`
建议阈值(示例):
1. 5 分钟发送失败率 > 1% 告警。
2. 连续 10 分钟消费失败率 > 0.5% 告警。
3. 积压超过基线 3 倍且持续 10 分钟告警。
## 8. 推荐实施顺序与工时
推荐顺序:
1. 先做阶段 A。
2. 阶段 A 稳定后再做阶段 B。
3. 阶段 C 按团队节奏处理。
粗略工时(含联调):
1. 阶段 A0.5 天。
2. 阶段 B1 天到 1.5 天。
3. 阶段 C0.5 天。
## 8.1 上线与回滚步骤(建议)
上线顺序:
1. 开发环境完成阶段 A验证双配置切换。
2. 预发环境灰度 1 个实例切换目标 MQ。
3. 小流量观察 1 天,确认无异常后全量。
回滚触发条件(任一满足即回滚):
1. 消费失败率持续超过阈值。
2. 积压持续增长且无法在观察窗口内回落。
3. 关键业务出现重复消费副作用。
回滚动作:
2. 不回滚代码,确保恢复路径最短。
## 9. 当前建议
你现在对项目结构还在熟悉期,建议先做阶段 A 的评审与验证,不直接进入阶段 B。
这样能先拿到“可切换能力”,同时把改造风险控制在最低范围。

View File

@@ -1,241 +0,0 @@
# rdms-spring-boot-starter-mq
## 模块定位
`rdms-spring-boot-starter-mq` 是项目里的消息队列基础封装,目标是:
1. 对 Redis 消息模型做统一抽象,降低业务接入复杂度
2. 通过自动配置按需启用消费者容器,不强制业务使用全部能力
3. 为后续多 MQ 方案Redis / RabbitMQ / RocketMQ / Kafka提供统一入口
当前实现里,**核心能力在 Redis**RabbitMQ 是轻量补充(消息转换器)。
## 设计思路
### 1. Redis 统一模型
Redis 相关能力分成两类:
1. `Pub/Sub` 广播模型
2. `Stream` 分组消费模型
两者的消息基类分别是:
1. `AbstractRedisChannelMessage`
2. `AbstractRedisStreamMessage`
共同父类是 `AbstractRedisMessage`,内置 `headers`,方便扩展链路信息。
### 2. 统一发送模板
发送端统一走 `RedisMQTemplate`:
1. `send(AbstractRedisChannelMessage)` -> Redis `convertAndSend`
2. `send(AbstractRedisStreamMessage)` -> Redis Stream `XADD`
这样业务代码不需要关心底层命令细节。
### 3. 监听器抽象 + 惰性自动装配
消费者由两个抽象监听器承接:
1. `AbstractRedisChannelMessageListener<T>`
2. `AbstractRedisStreamMessageListener<T>`
自动配置采用 `@ConditionalOnBean(...)`:
1. 只有你定义了对应监听器 Bean容器才会注册消费者
2. 没有监听器时不会额外启动 MQ 消费组件
### 4. Stream 运维补偿
针对 Redis Stream框架额外提供两个定时任务:
1. `RedisPendingMessageResendJob`
扫描 pending 超时消息并重投(默认超时 5 分钟)
2. `RedisStreamMessageCleanupJob`
定时 `XTRIM`,默认仅保留最近 10000 条,防止内存膨胀
两者都使用 Redisson 分布式锁,避免多实例重复执行。
### 5. RabbitMQ 轻封装
`RdmsRabbitMQAutoConfiguration` 只提供 `Jackson2JsonMessageConverter`,让 RabbitTemplate / @RabbitListener 默认按 JSON 处理消息。
### 6. 多 MQ 的真实边界
虽然模块描述写了支持 Redis / RocketMQ / RabbitMQ / Kafka
但本 starter 的自动配置主要是 Redis + Rabbit。
## 自动配置入口
`META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports`:
1. `RdmsRedisMQProducerAutoConfiguration`
2. `RdmsRedisMQConsumerAutoConfiguration`
3. `RdmsRabbitMQAutoConfiguration`
## 如何使用
## 1. 引入依赖
一般由业务模块直接依赖:
```xml
<dependency>
<groupId>com.njcn</groupId>
<artifactId>rdms-spring-boot-starter-mq</artifactId>
</dependency>
```
## 1.1 项目现有 Redis 作为 MQ 的常规用法(推荐)
推荐按这个顺序接入:
1. 配好 `spring.data.redis.*`
2. 先按第 2 节使用 `Pub/Sub`(这是项目里最常见路径)。
3. 需要可恢复消费时,再按第 3 节接入 `Stream`
## 2. 使用 Redis Pub/Sub广播
适用场景: 通知广播、在线会话广播、对可靠性要求不高但强调实时性。
### 2.1 定义消息
```java
import com.njcn.rdms.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.Data;
@Data
public class DemoNotifyMessage extends AbstractRedisChannelMessage {
private Long userId;
private String content;
// 可选: 自定义 channel不重写时默认是类名
@Override
public String getChannel() {
return "demo:notify";
}
}
```
### 2.2 定义消费者
```java
import com.njcn.rdms.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoNotifyListener extends AbstractRedisChannelMessageListener<DemoNotifyMessage> {
@Override
public void onMessage(DemoNotifyMessage message) {
log.info("收到广播消息 userId={}, content={}", message.getUserId(), message.getContent());
}
}
```
### 2.3 发送消息
```java
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class DemoNotifyProducer {
private final RedisMQTemplate redisMQTemplate;
public void send(Long userId, String content) {
redisMQTemplate.send(new DemoNotifyMessage()
.setUserId(userId)
.setContent(content));
}
}
```
## 3. 使用 Redis Stream分组消费 + ACK
适用场景: 异步任务、可恢复消费、希望具备重投和清理机制。
### 3.1 定义消息
```java
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;
@Data
public class DemoTaskMessage extends AbstractRedisStreamMessage {
private Long taskId;
private String bizType;
// 可选: 自定义 stream key不重写时默认是类名
@Override
public String getStreamKey() {
return "demo:task:stream";
}
}
```
### 3.2 定义消费者
```java
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoTaskListener extends AbstractRedisStreamMessageListener<DemoTaskMessage> {
@Override
public void onMessage(DemoTaskMessage message) {
log.info("消费任务 taskId={}, bizType={}", message.getTaskId(), message.getBizType());
// 这里抛异常会导致本次消费失败,不会执行 ACK
}
}
```
### 3.3 发送消息
```java
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class DemoTaskProducer {
private final RedisMQTemplate redisMQTemplate;
public RecordId send(Long taskId, String bizType) {
return redisMQTemplate.send(new DemoTaskMessage()
.setTaskId(taskId)
.setBizType(bizType));
}
}
```
## 4. 消息拦截器扩展(可选)
可通过实现 `RedisMessageInterceptor`,对发送/消费前后做统一处理(比如租户透传、审计埋点)。
```java
import com.njcn.rdms.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
import org.springframework.stereotype.Component;
@Component
public class DemoRedisMessageInterceptor implements RedisMessageInterceptor {
@Override
public void sendMessageBefore(AbstractRedisMessage message) {
message.addHeader("source", "rdms-system");
}
}
```
## 配置说明
本模块本身几乎没有 `rdms.mq.*` 配置项,主要依赖:
1. `spring.data.redis.*`Redis 连接)
2. `spring.application.name`Stream consumer group 默认值)
另外:
1. Redis Stream 要求 Redis 版本 >= 5.0
2. 只有存在 Stream 监听器时,重投/清理定时任务才会生效
## 当前已知注意点
1. `AbstractRedisStreamMessageListener` 有一个 `(streamKey, group)` 构造器,内部把 `messageType` 置空;而消费反序列化仍依赖 `messageType`
建议优先使用无参构造路径(即泛型推断消息类型的默认方式)。
2. Stream 的重投机制本质是“至少一次投递”语义,业务侧应自行保证幂等。
## 适用与不适用
适用:
1. 需要快速接入 Redis 广播/异步任务队列
2. 希望保留基础的失败补偿和消息清理能力
3. 希望通过抽象基类统一消息定义风格
不适用:
1. 对事务一致性、延迟、吞吐有强约束且需要完整 MQ 运维体系
2. 需要复杂顺序语义、死信重试拓扑、跨机房高可用治理