diff --git a/.gitignore b/.gitignore index 9154f4c..4632c4b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ -# ---> Java # Compiled class file *.class +*.iml +*.idea +target/ +logs/ # Log file *.log @@ -14,13 +17,31 @@ # Package Files # *.jar *.war -*.nar *.ear -*.zip *.tar.gz *.rar # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* -replay_pid* +*velocity.log* + +# Eclipse # +.classpath +.project +.settings/ + +.DS_Store + +_dockerCerts/ + +.factorypath + +node_modules/ +package-lock.json +yarn.lock + +rebel.xml + +!DmJdbcDriver18.jar +!kingbase8-8.6.0.jar \ No newline at end of file diff --git a/README.md b/README.md index 26bdd37..87dd2da 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,3 @@ -# rocket-mq-springboot-starter - -针对官方的sdk的二次开发补充: -1、不支持LocalDate 和 LocalDateTime; -2、RockeMQ环境隔离问题; -3、消息发送成功或者失败要打印消息日志,用于业务排查问题。 -4、如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。 -等等问题 \ No newline at end of file +# rocket-mq-springboot-starter + +RocketMQ官方文档中推荐的最佳实践存在部分问题: \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d57e67d --- /dev/null +++ b/pom.xml @@ -0,0 +1,99 @@ + + + 4.0.0 + com.njcn + rocket-mq-springboot-starter + 1.0.0 + jar + rocket中间件二次开发sdk + + + nexus-releases + Nexus Release Repository + http://192.168.1.13:8001/nexus/content/repositories/releases/ + + + nexus-snapshots + Nexus Snapshot Repository + http://192.168.1.13:8001/nexus/content/repositories/snapshots/ + + + + + 1.8 + 8 + 8 + 2.3.12.RELEASE + 2.0.0.RELEASE + 5.7.9 + 1.18.18 + + + + + + org.springframework.boot + spring-boot-starter-web + ${springboot.version} + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.3 + + + + org.springframework.boot + spring-boot-configuration-processor + true + ${springboot.version} + + + org.springframework.boot + spring-boot-autoconfigure + ${autoconfigure.version} + + + + cn.hutool + hutool-all + ${hutool.version} + + + + org.projectlombok + lombok + ${lombok.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + + + diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java b/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java new file mode 100644 index 0000000..184744e --- /dev/null +++ b/src/main/java/com/njcn/middle/rocket/autoconfig/EnvironmentIsolationConfig.java @@ -0,0 +1,38 @@ +package com.njcn.middle.rocket.autoconfig; + +import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.util.StringUtils; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2023年08月04日 14:20 + */ +public class EnvironmentIsolationConfig implements BeanPostProcessor { + + private RocketEnhanceProperties rocketEnhanceProperties; + + public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) { + this.rocketEnhanceProperties = rocketEnhanceProperties; + } + + + /** + * 在装载Bean之前实现参数修改 + */ + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if(bean instanceof DefaultRocketMQListenerContainer){ + + DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; + + if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ + container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment())); + } + return container; + } + return bean; + } +} diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java new file mode 100644 index 0000000..b773cb0 --- /dev/null +++ b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketEnhanceProperties.java @@ -0,0 +1,24 @@ +package com.njcn.middle.rocket.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2023年08月04日 14:21 + */ +@ConfigurationProperties(prefix = "rocketmq.enhance") +@Data +public class RocketEnhanceProperties { + + /*** + * 是否隔离 + */ + private boolean enabledIsolation; + + /*** + * 当前环境,test、dev 或者 hainan beijing hebei + */ + private String environment; +} diff --git a/src/main/java/com/njcn/middle/rocket/autoconfig/RocketMQEnhanceAutoConfiguration.java b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketMQEnhanceAutoConfiguration.java new file mode 100644 index 0000000..79dcfba --- /dev/null +++ b/src/main/java/com/njcn/middle/rocket/autoconfig/RocketMQEnhanceAutoConfiguration.java @@ -0,0 +1,64 @@ +package com.njcn.middle.rocket.autoconfig; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.converter.MessageConverter; + +import java.util.List; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2023年08月04日 14:19 + */ +@Configuration +@EnableConfigurationProperties(RocketEnhanceProperties.class) +public class RocketMQEnhanceAutoConfiguration { + + /** + * 注入增强的RocketMQEnhanceTemplate + */ + @Bean + public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){ + return new RocketMQEnhanceTemplate(rocketMQTemplate); + } + + /** + * 解决RocketMQ Jackson不支持Java时间类型配置 + */ + @Bean + @Primary + public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ + RocketMQMessageConverter converter = new RocketMQMessageConverter(); + CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); + List messageConverterList = compositeMessageConverter.getConverters(); + for (MessageConverter messageConverter : messageConverterList) { + if(messageConverter instanceof MappingJackson2MessageConverter){ + MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; + ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); + objectMapper.registerModules(new JavaTimeModule()); + } + } + return converter; + } + + + /** + * 环境隔离配置 + */ + @Bean + @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true") + public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){ + return new EnvironmentIsolationConfig(rocketEnhanceProperties); + } +} diff --git a/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java b/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java new file mode 100644 index 0000000..e9fb00a --- /dev/null +++ b/src/main/java/com/njcn/middle/rocket/constant/EnhanceMessageConstant.java @@ -0,0 +1,36 @@ +package com.njcn.middle.rocket.constant; + +import lombok.experimental.UtilityClass; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2023年08月04日 14:20 + */ +@UtilityClass +public class EnhanceMessageConstant { + + public final int ONE_SECOND = 1; + public final int FIVE_SECOND = 2; + + public final int TEN_SECOND = 3; + public final int THIRTY_SECOND = 4; + public final int ONE_MINUTE = 5; + public final int TWO_MINUTE = 6; + public final int THREE_MINUTE = 7; + public final int FOUR_MINUTE = 8; + public final int FIVE_MINUTE = 9; + public final int SIX_MINUTE = 10; + public final int SEVEN_MINUTE = 11; + public final int EIGHT_MINUTE = 12; + public final int NINE_MINUTE = 13; + public final int TEN_MINUTE = 14; + public final int TWENTY_MINUTE = 15; + public final int THIRTY_MINUTE = 16; + public final int ONE_HOUR = 17; + public final int TWO_HOUR = 18; + + public final String RETRY_PREFIX = "RETRY_"; + + +} diff --git a/src/main/java/com/njcn/middle/rocket/domain/BaseMessage.java b/src/main/java/com/njcn/middle/rocket/domain/BaseMessage.java new file mode 100644 index 0000000..8d533f9 --- /dev/null +++ b/src/main/java/com/njcn/middle/rocket/domain/BaseMessage.java @@ -0,0 +1,34 @@ +package com.njcn.middle.rocket.domain; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2023年08月04日 10:53 + */ +@Data +public abstract class BaseMessage { + + /** + * 业务键,用于RocketMQ控制台查看消费情况 + */ + protected String key; + + /** + * 发送消息来源,用于排查问题 + */ + protected String source = ""; + + /** + * 发送时间 + */ + protected LocalDateTime sendTime = LocalDateTime.now(); + + /** + * 重试次数,用于判断重试次数,超过重试次数发送异常警告 + */ + protected Integer retryTimes = 0; +} diff --git a/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java b/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java new file mode 100644 index 0000000..162d8d7 --- /dev/null +++ b/src/main/java/com/njcn/middle/rocket/handler/EnhanceConsumerMessageHandler.java @@ -0,0 +1,166 @@ +package com.njcn.middle.rocket.handler; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.domain.BaseMessage; +import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; + +import javax.annotation.Resource; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2023年08月04日 13:44 + * 抽象消息监听器,封装了所有公共处理业务,如 + * 1、基础日志记录 + * 2、异常处理 + * 3、消息重试 + * 4、警告通知 + */ +@Slf4j +public abstract class EnhanceConsumerMessageHandler { + + + /** + * 默认重试次数 + */ + private static final int MAX_RETRY_TIMES = 3; + + /** + * 延时等级 + */ + private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND; + + + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + + /** + * 消息处理 + * + * @param message 待处理消息 + * @throws Exception 消费异常 + */ + protected abstract void handleMessage(T message) throws Exception; + + /** + * 超过重试次数消息,需要启用isRetry + * + * @param message 待处理消息 + */ + protected abstract void handleMaxRetriesExceeded(T message); + + + /** + * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理 + * + * @param message 待处理消息 + * @return true: 本次消息被过滤,false:不过滤 + */ + protected boolean filter(T message) { + return false; + } + + /** + * 是否异常时重复发送 + * + * @return true: 消息重试,false:不重试 + */ + protected abstract boolean isRetry(); + + /** + * 消费异常时是否抛出异常 + * 返回true,则由rocketmq机制自动重试 + * false:消费异常(如果没有开启重试则消息会被自动ack) + */ + protected abstract boolean throwException(); + + /** + * 最大重试次数 + * + * @return 最大重试次数,默认5次 + */ + protected int getMaxRetryTimes() { + return MAX_RETRY_TIMES; + } + + /** + * isRetry开启时,重新入队延迟时间 + * + * @return -1:立即入队重试 + */ + protected int getDelayLevel() { + return DELAY_LEVEL; + } + + /** + * 使用模板模式构建消息消费框架,可自由扩展或删减 + */ + public void dispatchMessage(T message) { + // 基础日志记录被父类处理了 + log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); + + if (filter(message)) { + log.info("消息id{}不满足消费条件,已过滤。", message.getKey()); + return; + } + // 超过最大重试次数时调用子类方法处理 + if (message.getRetryTimes() > getMaxRetryTimes()) { + handleMaxRetriesExceeded(message); + return; + } + try { + long now = System.currentTimeMillis(); + handleMessage(message); + long costTime = System.currentTimeMillis() - now; + log.info("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime); + } catch (Exception e) { + log.error("消息{}消费异常", message.getKey(), e); + // 是捕获异常还是抛出,由子类决定 + if (throwException()) { + //抛出异常,由DefaultMessageListenerConcurrently类处理 + throw new RuntimeException(e); + } + //此时如果不开启重试机制,则默认ACK了 + if (isRetry()) { + handleRetry(message); + } + } + } + + protected void handleRetry(T message) { + // 获取子类RocketMQMessageListener注解拿到topic和tag + RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); + if (annotation == null) { + return; + } + //重新构建消息体 + String messageSource = message.getSource(); + if (!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)) { + message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource); + } + message.setRetryTimes(message.getRetryTimes() + 1); + + SendResult sendResult; + + try { + // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息) + sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel()); + } catch (Exception ex) { + // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息 + //由生产者直接发送 + throw new RuntimeException(ex); + } + // 发送失败的处理就是不进行ACK,由RocketMQ重试 + if (sendResult.getSendStatus() != SendStatus.SEND_OK) { + throw new RuntimeException("重试消息发送失败"); + } + + } + + +} diff --git a/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java b/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java new file mode 100644 index 0000000..f5f3033 --- /dev/null +++ b/src/main/java/com/njcn/middle/rocket/template/RocketMQEnhanceTemplate.java @@ -0,0 +1,90 @@ +package com.njcn.middle.rocket.template; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.middle.rocket.autoconfig.RocketEnhanceProperties; +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2023年08月04日 11:06 + */ +@Slf4j +@RequiredArgsConstructor(onConstructor = @__(@Autowired)) +public class RocketMQEnhanceTemplate { + + private final RocketMQTemplate template; + + @Resource + private RocketEnhanceProperties rocketEnhanceProperties; + + public RocketMQTemplate getTemplate() { + return template; + } + + /** + * 根据系统上下文自动构建隔离后的topic + * 构建目的地 + */ + public String buildDestination(String topic, String tag) { + topic = reBuildTopic(topic); + return topic + ":" + tag; + } + + /** + * 根据环境重新隔离topic + * @param topic 原始topic + */ + private String reBuildTopic(String topic) { + if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ + return topic +"_" + rocketEnhanceProperties.getEnvironment(); + } + return topic; + } + + /** + * 发送同步消息 + */ + public SendResult send(String topic, String tag, T message) { + // 注意分隔符 + return send(buildDestination(topic,tag), message); + } + + + public SendResult send(String destination, T message) { + // 设置业务键,此处根据公共的参数进行处理 + // 更多的其它基础业务处理... + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + SendResult sendResult = template.syncSend(destination, sendMessage); + // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 + log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + return sendResult; + } + + /** + * 发送延迟消息 + */ + public SendResult send(String topic, String tag, T message, int delayLevel) { + return send(buildDestination(topic,tag), message, delayLevel); + } + + public SendResult send(String destination, T message, int delayLevel) { + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); + log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + return sendResult; + } + + +} diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..cf775c6 --- /dev/null +++ b/src/main/resources/META-INF/spring.factories @@ -0,0 +1,4 @@ +## AutoConfiguration +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.njcn.middle.rocket.autoconfig.RocketMQEnhanceAutoConfiguration +