package com.njcn.middle.rocket.template; import com.alibaba.fastjson.JSONObject; import com.njcn.middle.rocket.autoconfig.RocketEnhanceProperties; import com.njcn.middle.rocket.domain.BaseMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; import javax.annotation.Resource; /** * @author hongawen * @version 1.0.0 * @date 2023年08月04日 11:06 */ @Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class RocketMQEnhanceTemplate { private final RocketMQTemplate template; @Resource private RocketEnhanceProperties rocketEnhanceProperties; public RocketMQTemplate getTemplate() { return template; } /** * 根据系统上下文自动构建隔离后的topic * 构建目的地 */ public String buildDestination(String topic, String tag) { topic = reBuildTopic(topic); return topic + ":" + tag; } /** * 根据环境重新隔离topic * @param topic 原始topic */ private String reBuildTopic(String topic) { if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ return topic +"_" + rocketEnhanceProperties.getEnvironment(); } return topic; } /** * 发送同步消息 */ public SendResult send(String topic, String tag, T message) { // 注意分隔符 return send(buildDestination(topic,tag), message); } public SendResult send(String destination, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } /** * 发送延迟消息 */ public SendResult send(String topic, String tag, T message, int delayLevel) { return send(buildDestination(topic,tag), message, delayLevel); } public SendResult send(String destination, T message, int delayLevel) { Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } }