初始化
This commit is contained in:
232
rdms-framework/rdms-spring-boot-starter-mq/MQ_REFACTOR_PLAN.md
Normal file
232
rdms-framework/rdms-spring-boot-starter-mq/MQ_REFACTOR_PLAN.md
Normal file
@@ -0,0 +1,232 @@
|
||||
# MQ 改造方案(评审稿)
|
||||
|
||||
## 1. 背景与目标
|
||||
你当前的业务策略是:
|
||||
1. 单体部署优先使用 Redis 作为 MQ。
|
||||
2. 微服务部署优先使用 RocketMQ 作为 MQ。
|
||||
3. RabbitMQ、Kafka 暂时不作为主路径,仅保留包结构隔离,不参与核心链路。
|
||||
|
||||
本方案目标是:
|
||||
1. 在不破坏现有系统的前提下,建立可切换的 MQ 入口。
|
||||
2. 把“切换成本”收敛到配置层,而不是业务代码层。
|
||||
3. 采用分阶段改造,先低风险,后统一抽象。
|
||||
|
||||
## 1.1 本期非目标(避免范围蔓延)
|
||||
本期改造明确不包含:
|
||||
1. 严格顺序语义保障(全链路有序消费)。
|
||||
2. 统一死信队列(DLQ)治理体系。
|
||||
3. 事务消息与最终一致性框架化封装。
|
||||
4. 多机房容灾级别的 MQ 治理。
|
||||
|
||||
说明:
|
||||
1. 本期先完成“可切换、可回滚、可观测”的基础能力。
|
||||
2. 复杂语义能力后续在 RocketMQ 路线下单独立项。
|
||||
|
||||
## 2. 当前实现现状(简版)
|
||||
当前 `rdms-spring-boot-starter-mq` 的真实情况:
|
||||
1. Redis 能力最完整,包含模板、监听器抽象、Stream 补偿与清理任务。
|
||||
2. RabbitMQ 只有 `MessageConverter` 级别自动配置,未形成统一收发抽象。
|
||||
3. RocketMQ、Kafka 在该 starter 中没有对应自动配置主链路,实际接入主要在 WebSocket 模块。
|
||||
|
||||
当前 `rdms-system` 默认配置:
|
||||
1. `rdms.websocket.sender-type: local`,即默认并未走 MQ 分发链路。
|
||||
|
||||
## 3. 改造原则
|
||||
1. 先配置统一,再接口统一,最后再清理非主路径。
|
||||
2. 任何阶段都必须可回滚,且回滚只改配置不改代码。
|
||||
3. 保持 Rabbit/Kafka 包路径存在,避免一次性大删导致历史分支合并困难。
|
||||
4. 先让 WebSocket 场景打通可切换,再考虑扩展到业务 MQ。
|
||||
|
||||
## 4. 目标架构(落地后)
|
||||
统一引入配置:
|
||||
1. `rdms.mq.type=redis|rocketmq`
|
||||
|
||||
统一行为:
|
||||
1. 单体环境配 `redis`。
|
||||
2. 微服务环境配 `rocketmq`。
|
||||
3. WebSocket `sender-type` 跟随 `rdms.mq.type`,避免双配置。
|
||||
|
||||
建议方式:
|
||||
1. `rdms.websocket.sender-type: ${rdms.mq.type}`
|
||||
|
||||
## 4.1 配置矩阵(最小可运行)
|
||||
|
||||
### 单体(Redis)
|
||||
```yaml
|
||||
rdms:
|
||||
mq:
|
||||
type: redis
|
||||
websocket:
|
||||
sender-type: ${rdms.mq.type}
|
||||
|
||||
spring:
|
||||
data:
|
||||
redis:
|
||||
host: 127.0.0.1
|
||||
port: 6379
|
||||
```
|
||||
|
||||
### 微服务(RocketMQ)
|
||||
```yaml
|
||||
rdms:
|
||||
mq:
|
||||
type: rocketmq
|
||||
websocket:
|
||||
sender-type: ${rdms.mq.type}
|
||||
|
||||
rocketmq:
|
||||
name-server: 127.0.0.1:9876
|
||||
producer:
|
||||
group: rdms-producer-group
|
||||
```
|
||||
|
||||
备注:
|
||||
1. 微服务切 Rocket 前,必须先补齐 topic、consumer-group 等业务配置。
|
||||
2. 建议在 dev/local 保留 Redis 配置,便于快速回滚。
|
||||
|
||||
## 4.2 命名规范(Topic / Channel / StreamKey)
|
||||
建议统一命名规则,避免后续混乱:
|
||||
1. Redis Channel / StreamKey:`{env}:{domain}:{event}`
|
||||
2. Rocket Topic:`{env}_{domain}_{event}`
|
||||
3. ConsumerGroup:`{app}-{domain}-cg`
|
||||
|
||||
示例:
|
||||
1. `dev:system:notify`
|
||||
2. `prod_system_notify`
|
||||
3. `rdms-system-notify-cg`
|
||||
|
||||
## 5. 分阶段实施计划
|
||||
|
||||
## 阶段 A(低风险,建议先做)
|
||||
目标:只做配置层统一,不动大规模代码。
|
||||
|
||||
实施项:
|
||||
1. 新增配置项 `rdms.mq.type`,默认值 `redis`。
|
||||
2. 修改环境配置,使 `rdms.websocket.sender-type` 引用 `rdms.mq.type`。
|
||||
3. 本阶段不删除 Rabbit/Kafka 代码,不变更包结构。
|
||||
|
||||
收益:
|
||||
1. 切换 Redis/Rocket 只改配置。
|
||||
2. 不触碰核心业务逻辑,风险最低。
|
||||
|
||||
回滚:
|
||||
1. 把 `rdms.websocket.sender-type` 改回固定值即可。
|
||||
|
||||
## 阶段 B(中风险,接口统一)
|
||||
目标:抽象统一 MQ 发送接口,减少业务对具体中间件的耦合。
|
||||
|
||||
实施项:
|
||||
1. 新增统一接口,例如 `UnifiedMqSender`。
|
||||
2. 提供 `RedisMqSender` 与 `RocketMqSender` 两个实现。
|
||||
3. 用 `@ConditionalOnProperty` 根据 `rdms.mq.type` 注入唯一实现。
|
||||
4. 在 WebSocket 发送链路优先替换为统一接口调用。
|
||||
5. 统一消息契约,至少包含标准消息头:
|
||||
`msgId`、`bizKey`、`timestamp`、`producer`、`traceId`、`version`。
|
||||
|
||||
收益:
|
||||
1. 业务层不再直接依赖 `RedisMQTemplate` 或 `RocketMQTemplate`。
|
||||
2. 后续切换中间件时改动面可控。
|
||||
3. 消息协议可演进,减少多团队并行开发冲突。
|
||||
|
||||
回滚:
|
||||
1. 切回原 sender Bean 注入方案,保留统一接口代码但不启用。
|
||||
|
||||
## 阶段 C(可选,结构收敛)
|
||||
目标:让非主路径代码“隔离可见但不激活”。
|
||||
|
||||
实施项:
|
||||
1. Rabbit/Kafka 相关自动配置入口默认关闭。
|
||||
2. 保留目录与类,增加注释标识 `reserved` 或 `deprecated`。
|
||||
3. 文档明确“当前生产主路径仅 Redis/Rocket”。
|
||||
|
||||
收益:
|
||||
1. 新成员不会误判项目“全量支持四种 MQ”。
|
||||
2. 后续若要恢复 Rabbit/Kafka,可低成本重新开启。
|
||||
|
||||
回滚:
|
||||
1. 打开对应配置开关即可恢复。
|
||||
|
||||
## 6. 影响面评估
|
||||
主要影响模块:
|
||||
1. `rdms-framework/rdms-spring-boot-starter-mq`
|
||||
2. `rdms-framework/rdms-spring-boot-starter-websocket`
|
||||
3. `rdms-system/rdms-system-boot` 配置文件
|
||||
|
||||
主要风险点:
|
||||
1. Redis 与 Rocket 的消费语义不完全一致,幂等要在业务侧兜底。
|
||||
2. 广播模型下,多实例重复消费属于预期,业务处理要避免副作用。
|
||||
3. 配置切换后,缺失 Rocket 连接配置会导致启动失败。
|
||||
4. 现有代码中 Kafka 消费类注解存在明显可疑项,改造时应单独复核。
|
||||
5. 缺少统一幂等策略时,Rocket 与 Redis 切换后可能放大重复消费副作用。
|
||||
6. 配置缺失时若做了自动降级,可能导致“误以为切到 Rocket,实际走本地/Redis”。
|
||||
|
||||
## 6.1 幂等策略(建议作为阶段 B 配套)
|
||||
最小策略建议:
|
||||
1. 每条消息必须带 `msgId` 与 `bizKey`。
|
||||
2. 以 `bizKey` 作为业务幂等键(例如订单号、任务号)。
|
||||
3. 消费前先做幂等检查,消费成功后写入幂等记录。
|
||||
4. 幂等记录建议放 Redis,设置合理 TTL(如 3~7 天,按业务回放窗口)。
|
||||
|
||||
说明:
|
||||
1. `msgId` 解决“传输级去重定位”,`bizKey` 解决“业务级幂等”。
|
||||
2. 幂等是切换 MQ 前置条件,不应后补。
|
||||
|
||||
## 6.2 故障切换策略(明确 fail-fast)
|
||||
建议默认策略:
|
||||
1. 目标 MQ 不可用时 `fail-fast`,启动或发送直接失败并告警。
|
||||
2. 禁止隐式降级到 local,避免行为不透明。
|
||||
|
||||
可选策略:
|
||||
1. 在开发环境允许显式降级(需开关控制,并打印告警日志)。
|
||||
|
||||
## 7. 验收清单(每阶段都要过)
|
||||
1. 单体环境 `rdms.mq.type=redis` 可启动、可发送、可消费。
|
||||
2. 微服务环境 `rdms.mq.type=rocketmq` 可启动、可发送、可消费。
|
||||
3. WebSocket 在两种模式下都能完成跨实例消息投递。
|
||||
4. 关键链路日志可定位发送端与消费端。
|
||||
5. 切换只改配置,不改代码。
|
||||
6. 回滚路径已验证。
|
||||
7. 幂等校验通过(重复消息不会造成业务副作用)。
|
||||
8. 关键指标可观测并有告警(发送失败、消费失败、积压、重试)。
|
||||
|
||||
## 7.1 观测与告警建议
|
||||
建议统一接入以下指标:
|
||||
1. `mq_send_total` / `mq_send_fail_total`
|
||||
2. `mq_consume_total` / `mq_consume_fail_total`
|
||||
3. `mq_backlog_size`
|
||||
4. `mq_retry_total`
|
||||
|
||||
建议阈值(示例):
|
||||
1. 5 分钟发送失败率 > 1% 告警。
|
||||
2. 连续 10 分钟消费失败率 > 0.5% 告警。
|
||||
3. 积压超过基线 3 倍且持续 10 分钟告警。
|
||||
|
||||
## 8. 推荐实施顺序与工时
|
||||
推荐顺序:
|
||||
1. 先做阶段 A。
|
||||
2. 阶段 A 稳定后再做阶段 B。
|
||||
3. 阶段 C 按团队节奏处理。
|
||||
|
||||
粗略工时(含联调):
|
||||
1. 阶段 A:0.5 天。
|
||||
2. 阶段 B:1 天到 1.5 天。
|
||||
3. 阶段 C:0.5 天。
|
||||
|
||||
## 8.1 上线与回滚步骤(建议)
|
||||
上线顺序:
|
||||
1. 开发环境完成阶段 A,验证双配置切换。
|
||||
2. 预发环境灰度 1 个实例切换目标 MQ。
|
||||
3. 小流量观察 1 天,确认无异常后全量。
|
||||
|
||||
回滚触发条件(任一满足即回滚):
|
||||
1. 消费失败率持续超过阈值。
|
||||
2. 积压持续增长且无法在观察窗口内回落。
|
||||
3. 关键业务出现重复消费副作用。
|
||||
|
||||
回滚动作:
|
||||
1. 只回滚 `rdms.mq.type` 与 `rdms.websocket.sender-type` 配置。
|
||||
2. 不回滚代码,确保恢复路径最短。
|
||||
|
||||
## 9. 当前建议
|
||||
你现在对项目结构还在熟悉期,建议先做阶段 A 的评审与验证,不直接进入阶段 B。
|
||||
这样能先拿到“可切换能力”,同时把改造风险控制在最低范围。
|
||||
256
rdms-framework/rdms-spring-boot-starter-mq/README.md
Normal file
256
rdms-framework/rdms-spring-boot-starter-mq/README.md
Normal file
@@ -0,0 +1,256 @@
|
||||
# rdms-spring-boot-starter-mq
|
||||
|
||||
## 模块定位
|
||||
`rdms-spring-boot-starter-mq` 是项目里的消息队列基础封装,目标是:
|
||||
1. 对 Redis 消息模型做统一抽象,降低业务接入复杂度
|
||||
2. 通过自动配置按需启用消费者容器,不强制业务使用全部能力
|
||||
3. 为后续多 MQ 方案(Redis / RabbitMQ / RocketMQ / Kafka)提供统一入口
|
||||
|
||||
当前实现里,**核心能力在 Redis**,RabbitMQ 是轻量补充(消息转换器)。
|
||||
|
||||
## 设计思路
|
||||
|
||||
### 1. Redis 统一模型
|
||||
Redis 相关能力分成两类:
|
||||
1. `Pub/Sub` 广播模型
|
||||
2. `Stream` 分组消费模型
|
||||
|
||||
两者的消息基类分别是:
|
||||
1. `AbstractRedisChannelMessage`
|
||||
2. `AbstractRedisStreamMessage`
|
||||
|
||||
共同父类是 `AbstractRedisMessage`,内置 `headers`,方便扩展链路信息。
|
||||
|
||||
### 2. 统一发送模板
|
||||
发送端统一走 `RedisMQTemplate`:
|
||||
1. `send(AbstractRedisChannelMessage)` -> Redis `convertAndSend`
|
||||
2. `send(AbstractRedisStreamMessage)` -> Redis Stream `XADD`
|
||||
|
||||
这样业务代码不需要关心底层命令细节。
|
||||
|
||||
### 3. 监听器抽象 + 惰性自动装配
|
||||
消费者由两个抽象监听器承接:
|
||||
1. `AbstractRedisChannelMessageListener<T>`
|
||||
2. `AbstractRedisStreamMessageListener<T>`
|
||||
|
||||
自动配置采用 `@ConditionalOnBean(...)`:
|
||||
1. 只有你定义了对应监听器 Bean,容器才会注册消费者
|
||||
2. 没有监听器时不会额外启动 MQ 消费组件
|
||||
|
||||
### 4. Stream 运维补偿
|
||||
针对 Redis Stream,框架额外提供两个定时任务:
|
||||
1. `RedisPendingMessageResendJob`
|
||||
扫描 pending 超时消息并重投(默认超时 5 分钟)
|
||||
2. `RedisStreamMessageCleanupJob`
|
||||
定时 `XTRIM`,默认仅保留最近 10000 条,防止内存膨胀
|
||||
|
||||
两者都使用 Redisson 分布式锁,避免多实例重复执行。
|
||||
|
||||
### 5. RabbitMQ 轻封装
|
||||
`RdmsRabbitMQAutoConfiguration` 只提供 `Jackson2JsonMessageConverter`,让 RabbitTemplate / @RabbitListener 默认按 JSON 处理消息。
|
||||
|
||||
### 6. 多 MQ 的真实边界
|
||||
虽然模块描述写了支持 Redis / RocketMQ / RabbitMQ / Kafka,
|
||||
但本 starter 的自动配置主要是 Redis + Rabbit。
|
||||
RocketMQ / Kafka 的业务化接入在 `rdms-spring-boot-starter-websocket` 里有更完整示例。
|
||||
|
||||
## 自动配置入口
|
||||
`META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports`:
|
||||
1. `RdmsRedisMQProducerAutoConfiguration`
|
||||
2. `RdmsRedisMQConsumerAutoConfiguration`
|
||||
3. `RdmsRabbitMQAutoConfiguration`
|
||||
|
||||
## 如何使用
|
||||
|
||||
## 1. 引入依赖
|
||||
一般由业务模块直接依赖:
|
||||
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>com.njcn</groupId>
|
||||
<artifactId>rdms-spring-boot-starter-mq</artifactId>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
## 1.1 项目现有 Redis 作为 MQ 的常规用法(推荐)
|
||||
推荐按这个顺序接入:
|
||||
1. 配好 `spring.data.redis.*`。
|
||||
2. 先按第 2 节使用 `Pub/Sub`(这是项目里最常见路径)。
|
||||
3. 需要可恢复消费时,再按第 3 节接入 `Stream`。
|
||||
|
||||
WebSocket 场景建议:
|
||||
1. 多实例广播: `rdms.websocket.sender-type: redis`
|
||||
2. 单机部署: 保持 `rdms.websocket.sender-type: local`
|
||||
|
||||
说明:
|
||||
1. 只要存在 `AbstractRedisChannelMessageListener` Bean,框架会自动注册 Redis 监听容器。
|
||||
|
||||
## 2. 使用 Redis Pub/Sub(广播)
|
||||
适用场景: 通知广播、在线会话广播、对可靠性要求不高但强调实时性。
|
||||
|
||||
### 2.1 定义消息
|
||||
```java
|
||||
import com.njcn.rdms.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DemoNotifyMessage extends AbstractRedisChannelMessage {
|
||||
|
||||
private Long userId;
|
||||
private String content;
|
||||
|
||||
// 可选: 自定义 channel;不重写时默认是类名
|
||||
@Override
|
||||
public String getChannel() {
|
||||
return "demo:notify";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 2.2 定义消费者
|
||||
```java
|
||||
import com.njcn.rdms.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DemoNotifyListener extends AbstractRedisChannelMessageListener<DemoNotifyMessage> {
|
||||
|
||||
@Override
|
||||
public void onMessage(DemoNotifyMessage message) {
|
||||
log.info("收到广播消息 userId={}, content={}", message.getUserId(), message.getContent());
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 2.3 发送消息
|
||||
```java
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DemoNotifyProducer {
|
||||
|
||||
private final RedisMQTemplate redisMQTemplate;
|
||||
|
||||
public void send(Long userId, String content) {
|
||||
redisMQTemplate.send(new DemoNotifyMessage()
|
||||
.setUserId(userId)
|
||||
.setContent(content));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 3. 使用 Redis Stream(分组消费 + ACK)
|
||||
适用场景: 异步任务、可恢复消费、希望具备重投和清理机制。
|
||||
|
||||
### 3.1 定义消息
|
||||
```java
|
||||
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DemoTaskMessage extends AbstractRedisStreamMessage {
|
||||
|
||||
private Long taskId;
|
||||
private String bizType;
|
||||
|
||||
// 可选: 自定义 stream key;不重写时默认是类名
|
||||
@Override
|
||||
public String getStreamKey() {
|
||||
return "demo:task:stream";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.2 定义消费者
|
||||
```java
|
||||
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DemoTaskListener extends AbstractRedisStreamMessageListener<DemoTaskMessage> {
|
||||
|
||||
@Override
|
||||
public void onMessage(DemoTaskMessage message) {
|
||||
log.info("消费任务 taskId={}, bizType={}", message.getTaskId(), message.getBizType());
|
||||
// 这里抛异常会导致本次消费失败,不会执行 ACK
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.3 发送消息
|
||||
```java
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DemoTaskProducer {
|
||||
|
||||
private final RedisMQTemplate redisMQTemplate;
|
||||
|
||||
public RecordId send(Long taskId, String bizType) {
|
||||
return redisMQTemplate.send(new DemoTaskMessage()
|
||||
.setTaskId(taskId)
|
||||
.setBizType(bizType));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 4. 消息拦截器扩展(可选)
|
||||
可通过实现 `RedisMessageInterceptor`,对发送/消费前后做统一处理(比如租户透传、审计埋点)。
|
||||
|
||||
```java
|
||||
import com.njcn.rdms.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class DemoRedisMessageInterceptor implements RedisMessageInterceptor {
|
||||
|
||||
@Override
|
||||
public void sendMessageBefore(AbstractRedisMessage message) {
|
||||
message.addHeader("source", "rdms-system");
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 配置说明
|
||||
本模块本身几乎没有 `rdms.mq.*` 配置项,主要依赖:
|
||||
1. `spring.data.redis.*`(Redis 连接)
|
||||
2. `spring.application.name`(Stream consumer group 默认值)
|
||||
|
||||
另外:
|
||||
1. Redis Stream 要求 Redis 版本 >= 5.0
|
||||
2. 只有存在 Stream 监听器时,重投/清理定时任务才会生效
|
||||
|
||||
## 与 WebSocket 的关系
|
||||
`rdms-spring-boot-starter-websocket` 直接复用本模块能力:
|
||||
1. `sender-type=redis` 时使用 `RedisMQTemplate` + `AbstractRedisChannelMessageListener`
|
||||
2. `sender-type=rabbitmq/rocketmq/kafka` 时切换到对应中间件实现
|
||||
|
||||
你可以参考 websocket 模块作为本模块的落地样板。
|
||||
|
||||
## 当前已知注意点
|
||||
1. `AbstractRedisStreamMessageListener` 有一个 `(streamKey, group)` 构造器,内部把 `messageType` 置空;而消费反序列化仍依赖 `messageType`。
|
||||
建议优先使用无参构造路径(即泛型推断消息类型的默认方式)。
|
||||
2. Stream 的重投机制本质是“至少一次投递”语义,业务侧应自行保证幂等。
|
||||
|
||||
## 适用与不适用
|
||||
|
||||
适用:
|
||||
1. 需要快速接入 Redis 广播/异步任务队列
|
||||
2. 希望保留基础的失败补偿和消息清理能力
|
||||
3. 希望通过抽象基类统一消息定义风格
|
||||
|
||||
不适用:
|
||||
1. 对事务一致性、延迟、吞吐有强约束且需要完整 MQ 运维体系
|
||||
2. 需要复杂顺序语义、死信重试拓扑、跨机房高可用治理
|
||||
44
rdms-framework/rdms-spring-boot-starter-mq/pom.xml
Normal file
44
rdms-framework/rdms-spring-boot-starter-mq/pom.xml
Normal file
@@ -0,0 +1,44 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.njcn</groupId>
|
||||
<artifactId>rdms-framework</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>rdms-spring-boot-starter-mq</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种</description>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<!-- DB 相关 -->
|
||||
<dependency>
|
||||
<groupId>com.njcn</groupId>
|
||||
<artifactId>rdms-spring-boot-starter-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 消息队列相关 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-rabbit</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.njcn.rdms.framework.mq.rabbitmq.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
/**
|
||||
* RabbitMQ 消息队列配置类
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@Slf4j
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public class RdmsRabbitMQAutoConfiguration {
|
||||
|
||||
/**
|
||||
* Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息
|
||||
*/
|
||||
@Bean
|
||||
public MessageConverter createMessageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* 占位符,无特殊逻辑
|
||||
*/
|
||||
package com.njcn.rdms.framework.mq.rabbitmq.core;
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* 消息队列,基于 RabbitMQ 提供
|
||||
*/
|
||||
package com.njcn.rdms.framework.mq.rabbitmq;
|
||||
@@ -0,0 +1,162 @@
|
||||
package com.njcn.rdms.framework.mq.redis.config;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import com.njcn.rdms.framework.common.enums.DocumentEnum;
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import com.njcn.rdms.framework.mq.redis.core.job.RedisPendingMessageResendJob;
|
||||
import com.njcn.rdms.framework.mq.redis.core.job.RedisStreamMessageCleanupJob;
|
||||
import com.njcn.rdms.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
|
||||
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import com.njcn.rdms.framework.redis.config.RdmsRedisAutoConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.redis.connection.RedisServerCommands;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Redis 消息队列 Consumer 配置类
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
@Slf4j
|
||||
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
|
||||
@AutoConfiguration(after = RdmsRedisAutoConfiguration.class)
|
||||
public class RdmsRedisMQConsumerAutoConfiguration {
|
||||
|
||||
/**
|
||||
* 创建 Redis Pub/Sub 广播消费的容器
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
|
||||
// 创建 RedisMessageListenerContainer 对象
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
// 设置 RedisConnection 工厂。
|
||||
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
|
||||
// 添加监听器
|
||||
listeners.forEach(listener -> {
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
|
||||
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
|
||||
listener.getChannel(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 重新消费的任务
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
|
||||
RedisMQTemplate redisTemplate,
|
||||
RedissonClient redissonClient) {
|
||||
return new RedisPendingMessageResendJob(listeners, redisTemplate, redissonClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 消息清理任务
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnBean(AbstractRedisStreamMessageListener.class)
|
||||
public RedisStreamMessageCleanupJob redisStreamMessageCleanupJob(List<AbstractRedisStreamMessageListener<?>> listeners,
|
||||
RedisMQTemplate redisTemplate,
|
||||
RedissonClient redissonClient) {
|
||||
return new RedisStreamMessageCleanupJob(listeners, redisTemplate, redissonClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 集群消费的容器
|
||||
*
|
||||
* 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
|
||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
||||
checkRedisVersion(redisTemplate);
|
||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
||||
// 创建 options 配置
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
||||
.batchSize(10) // 一次性最多拉取多少条消息
|
||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
||||
.build();
|
||||
// 创建 container 对象
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
|
||||
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
||||
|
||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||
String consumerName = buildConsumerName();
|
||||
listeners.parallelStream().forEach(listener -> {
|
||||
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
|
||||
listener.getStreamKey(), listener.getClass().getName());
|
||||
// 创建 listener 对应的消费者分组
|
||||
try {
|
||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
// 设置 listener 对应的 redisTemplate
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
// 创建 Consumer 对象
|
||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
||||
// 设置 Consumer 监听
|
||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
||||
.builder(streamOffset).consumer(consumer)
|
||||
.autoAcknowledge(false) // 不自动 ack
|
||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
||||
container.register(builder.build(), listener);
|
||||
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
|
||||
listener.getStreamKey(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
||||
* 参考自 RocketMQ clientId 的实现
|
||||
*
|
||||
* @return 消费者名字
|
||||
*/
|
||||
public static String buildConsumerName() {
|
||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验 Redis 版本号,是否满足最低的版本号要求!
|
||||
*/
|
||||
public static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
|
||||
// 获得 Redis 版本
|
||||
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
|
||||
String version = MapUtil.getStr(info, "redis_version");
|
||||
// 校验最低版本必须大于等于 5.0.0
|
||||
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
|
||||
if (majorVersion < 5) {
|
||||
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
|
||||
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.njcn.rdms.framework.mq.redis.config;
|
||||
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import com.njcn.rdms.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import com.njcn.rdms.framework.redis.config.RdmsRedisAutoConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis 消息队列 Producer 配置类
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
@Slf4j
|
||||
@AutoConfiguration(after = RdmsRedisAutoConfiguration.class)
|
||||
public class RdmsRedisMQProducerAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
|
||||
List<RedisMessageInterceptor> interceptors) {
|
||||
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
|
||||
// 添加拦截器
|
||||
interceptors.forEach(redisMQTemplate::addInterceptor);
|
||||
return redisMQTemplate;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core;
|
||||
|
||||
import com.njcn.rdms.framework.common.util.json.JsonUtils;
|
||||
import com.njcn.rdms.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import com.njcn.rdms.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
||||
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis MQ 操作模板类
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class RedisMQTemplate {
|
||||
|
||||
@Getter
|
||||
private final RedisTemplate<String, ?> redisTemplate;
|
||||
/**
|
||||
* 拦截器数组
|
||||
*/
|
||||
@Getter
|
||||
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public <T extends AbstractRedisChannelMessage> void send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 消息记录的编号对象
|
||||
*/
|
||||
public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加拦截器
|
||||
*
|
||||
* @param interceptor 拦截器
|
||||
*/
|
||||
public void addInterceptor(RedisMessageInterceptor interceptor) {
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
|
||||
private void sendMessageBefore(AbstractRedisMessage message) {
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
|
||||
}
|
||||
|
||||
private void sendMessageAfter(AbstractRedisMessage message) {
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.interceptor;
|
||||
|
||||
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
|
||||
/**
|
||||
* {@link AbstractRedisMessage} 消息拦截器
|
||||
* 通过拦截器,作为插件机制,实现拓展。
|
||||
* 例如说,多租户场景下的 MQ 消息处理
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
public interface RedisMessageInterceptor {
|
||||
|
||||
default void sendMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void sendMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.job;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.stream.*;
|
||||
import org.springframework.data.redis.core.StreamOperations;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 这个任务用于处理,crash 之后的消费者未消费完的消息
|
||||
*/
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class RedisPendingMessageResendJob {
|
||||
|
||||
private static final String LOCK_KEY = "redis:stream:pending-message-resend:lock";
|
||||
|
||||
/**
|
||||
* 消息超时时间,默认 5 分钟
|
||||
*
|
||||
* 1. 超时的消息才会被重新投递
|
||||
* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息 5 分钟过期后,再等 1 分钟才会被扫瞄到
|
||||
*/
|
||||
private static final int EXPIRE_TIME = 5 * 60;
|
||||
|
||||
private final List<AbstractRedisStreamMessageListener<?>> listeners;
|
||||
private final RedisMQTemplate redisTemplate;
|
||||
private final RedissonClient redissonClient;
|
||||
|
||||
/**
|
||||
* 一分钟执行一次,这里选择每分钟的 35 秒执行,是为了避免整点任务过多的问题
|
||||
*/
|
||||
@Scheduled(cron = "35 * * * * ?")
|
||||
public void messageResend() {
|
||||
RLock lock = redissonClient.getLock(LOCK_KEY);
|
||||
// 尝试加锁
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
execute();
|
||||
} catch (Exception ex) {
|
||||
log.error("[messageResend][执行异常]", ex);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行清理逻辑
|
||||
*
|
||||
* @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a>
|
||||
*/
|
||||
private void execute() {
|
||||
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
|
||||
listeners.forEach(listener -> {
|
||||
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), listener.getGroup()));
|
||||
// 每个消费者的 pending 队列消息数量
|
||||
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
|
||||
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
|
||||
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
|
||||
// 每个消费者的 pending消息的详情信息
|
||||
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(listener.getGroup(), consumerName), Range.unbounded(), pendingMessageCount);
|
||||
if (pendingMessages.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
pendingMessages.forEach(pendingMessage -> {
|
||||
// 获取消息上一次传递到 consumer 的时间,
|
||||
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
|
||||
if (lastDelivery < EXPIRE_TIME){
|
||||
return;
|
||||
}
|
||||
// 获取指定 id 的消息体
|
||||
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
|
||||
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
|
||||
if (CollUtil.isEmpty(records)) {
|
||||
return;
|
||||
}
|
||||
// 重新投递消息
|
||||
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(records.get(0).getValue()) // 设置内容
|
||||
.withStreamKey(listener.getStreamKey()));
|
||||
// ack 消息消费完成
|
||||
redisTemplate.getRedisTemplate().opsForStream().acknowledge(listener.getGroup(), records.get(0));
|
||||
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.job;
|
||||
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import com.njcn.rdms.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.data.redis.core.StreamOperations;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Stream 消息清理任务
|
||||
* 用于定期清理已消费的消息,防止内存占用过大
|
||||
*
|
||||
* @see <a href="https://www.cnblogs.com/nanxiang/p/16179519.html">记一次 redis stream 数据类型内存不释放问题</a>
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class RedisStreamMessageCleanupJob {
|
||||
|
||||
private static final String LOCK_KEY = "redis:stream:message-cleanup:lock";
|
||||
|
||||
/**
|
||||
* 保留的消息数量,默认保留最近 10000 条消息
|
||||
*/
|
||||
private static final long MAX_COUNT = 10000;
|
||||
|
||||
private final List<AbstractRedisStreamMessageListener<?>> listeners;
|
||||
private final RedisMQTemplate redisTemplate;
|
||||
private final RedissonClient redissonClient;
|
||||
|
||||
/**
|
||||
* 每小时执行一次清理任务
|
||||
*/
|
||||
@Scheduled(cron = "0 0 * * * ?")
|
||||
public void cleanup() {
|
||||
RLock lock = redissonClient.getLock(LOCK_KEY);
|
||||
// 尝试加锁
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
execute();
|
||||
} catch (Exception ex) {
|
||||
log.error("[cleanup][执行异常]", ex);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行清理逻辑
|
||||
*/
|
||||
private void execute() {
|
||||
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
|
||||
listeners.forEach(listener -> {
|
||||
try {
|
||||
// 使用 XTRIM 命令清理消息,只保留最近的 MAX_LEN 条消息
|
||||
Long trimCount = ops.trim(listener.getStreamKey(), MAX_COUNT, true);
|
||||
if (trimCount != null && trimCount > 0) {
|
||||
log.info("[execute][Stream({}) 清理消息数量({})]", listener.getStreamKey(), trimCount);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log.error("[execute][Stream({}) 清理异常]", listener.getStreamKey(), ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.message;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Redis 消息抽象基类
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
@Data
|
||||
public abstract class AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 头
|
||||
*/
|
||||
private Map<String, String> headers = new HashMap<>();
|
||||
|
||||
public String getHeader(String key) {
|
||||
return headers.get(key);
|
||||
}
|
||||
|
||||
public void addHeader(String key, String value) {
|
||||
headers.put(key, value);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.pubsub;
|
||||
|
||||
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Channel Message 抽象类
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Channel,默认使用类名
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
|
||||
public String getChannel() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,103 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.pubsub;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import com.njcn.rdms.framework.common.util.json.JsonUtils;
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import com.njcn.rdms.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Pub/Sub 监听器抽象类,用于实现广播消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
private final String channel;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractRedisChannelMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获得 Sub 订阅的 Redis Channel 通道
|
||||
*
|
||||
* @return channel
|
||||
*/
|
||||
public final String getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onMessage(Message message, byte[] bytes) {
|
||||
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.stream;
|
||||
|
||||
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Stream Message 抽象类
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Stream Key,默认使用类名
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
public String getStreamKey() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
package com.njcn.rdms.framework.mq.redis.core.stream;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import com.njcn.rdms.framework.common.util.json.JsonUtils;
|
||||
import com.njcn.rdms.framework.mq.redis.core.RedisMQTemplate;
|
||||
import com.njcn.rdms.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import com.njcn.rdms.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Stream 监听器抽象类,用于实现集群消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author hongawen
|
||||
*/
|
||||
public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage>
|
||||
implements StreamListener<String, ObjectRecord<String, String>> {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
@Getter
|
||||
private final String streamKey;
|
||||
|
||||
/**
|
||||
* Redis 消费者分组,默认使用 spring.application.name 名字
|
||||
*/
|
||||
@Value("${spring.application.name}")
|
||||
@Getter
|
||||
private String group;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractRedisStreamMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
|
||||
}
|
||||
|
||||
protected AbstractRedisStreamMessageListener(String streamKey, String group) {
|
||||
this.messageType = null;
|
||||
this.streamKey = streamKey;
|
||||
this.group = group;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, String> message) {
|
||||
// 消费消息
|
||||
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
// ack 消息消费完成
|
||||
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
|
||||
// TODO 需要额外考虑以下几个点:
|
||||
// 1. 处理异常的情况
|
||||
// 2. 发送日志;以及事务的结合
|
||||
// 3. 消费日志;以及通用的幂等性
|
||||
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
/**
|
||||
* 消息队列,基于 Redis 提供:
|
||||
* 1. 基于 Pub/Sub 实现广播消费
|
||||
* 2. 基于 Stream 实现集群消费
|
||||
*/
|
||||
package com.njcn.rdms.framework.mq.redis;
|
||||
@@ -0,0 +1,3 @@
|
||||
com.njcn.rdms.framework.mq.redis.config.RdmsRedisMQProducerAutoConfiguration
|
||||
com.njcn.rdms.framework.mq.redis.config.RdmsRedisMQConsumerAutoConfiguration
|
||||
com.njcn.rdms.framework.mq.rabbitmq.config.RdmsRabbitMQAutoConfiguration
|
||||
Reference in New Issue
Block a user