From 4cfed98139abb66a343ace86dd4d165cf6939685 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Mon, 11 May 2026 09:03:18 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=BC=E5=AE=B9jb=E9=98=BF=E9=87=8C=E4=BA=91?= =?UTF-8?q?top=E6=A0=BC=E5=BC=8F=E9=83=BD=E6=98=AF=E5=AE=9E=E4=BE=8Bid%Top?= =?UTF-8?q?ic=EF=BC=8Cconsumer=E6=A0=BC=E5=BC=8FGID=5Fconsumer=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autoconfig/EnvironmentIsolationConfig.java | 13 +++++++++++++ .../rocket/autoconfig/RocketEnhanceProperties.java | 5 +++++ .../rocket/template/RocketMQEnhanceTemplate.java | 10 ++++++++-- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java b/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java index 184744e..290059a 100644 --- a/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java +++ b/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java @@ -27,10 +27,23 @@ public class EnvironmentIsolationConfig implements BeanPostProcessor { if(bean instanceof DefaultRocketMQListenerContainer){ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; + // 修改 ConsumerGroup(增加前缀,例如 dev_原始Group) if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment())); + } + //兼容jb阿里云top格式都是实例id%Topic,consumer格式GID_consumer + if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getTopicPrefix())){ + container.setTopic(String.join("%",rocketEnhanceProperties.getTopicPrefix(), container.getTopic())); + + } + if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getConsumerGroupPrefix())){ + String originalGroup = container.getConsumerGroup(); + String isolatedGroup = String.join("_", rocketEnhanceProperties.getConsumerGroupPrefix(),originalGroup); + container.setConsumerGroup(isolatedGroup); + } + return container; } return bean; diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java index b773cb0..cb8257d 100644 --- a/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java +++ b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java @@ -21,4 +21,9 @@ public class RocketEnhanceProperties { * 当前环境,test、dev 或者 hainan beijing hebei */ private String environment; + + private String consumerGroupPrefix; + + private String topicPrefix; + } diff --git a/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java b/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java index 1363118..7ac7c54 100644 --- a/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java +++ b/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java @@ -47,10 +47,16 @@ public class RocketMQEnhanceTemplate { * @param topic 原始topic */ private String reBuildTopic(String topic) { + String result =topic; if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ - return topic +"_" + rocketEnhanceProperties.getEnvironment(); + result = result +"_" + rocketEnhanceProperties.getEnvironment(); } - return topic; + //兼容jb阿里云top格式都是实例id%Topi + if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getTopicPrefix())){ + result = String.join("%",rocketEnhanceProperties.getTopicPrefix(), result); + + } + return result; } /**