#ifndef _ROCKETMQ_H_ #define _ROCKETMQ_H_ #include #include #include #include #include #include #include #include #include #include #include #include #include //////////////////////////////////////////////////////////////////////////////////////////////////////// #include "rocketmq/DefaultMQPushConsumer.h" #include "rocketmq/DefaultMQProducer.h" #include "rocketmq/MQMessageListener.h" #include "rocketmq/MQMessageExt.h" #include "rocketmq/MQMessageQueue.h" #include "rocketmq/MQSelector.h" #include "rocketmq/SendResult.h" #include "rocketmq/SessionCredentials.h" #include "interface.h" //////////////////////////////////////////////////////////////////////////////////////////////////////////// class front; /////////////////////////////////////////////////////////////////////////////////////////////////////////// extern int g_front_seg_index; extern int G_TEST_NUM; extern int G_TEST_TYPE; extern int TEST_PORT; //备用 extern std::string BROKER_LIST; //mq extern std::string G_ROCKETMQ_PRODUCER; extern std::string G_MQPRODUCER_IPPORT; extern std::string G_MQPRODUCER_ACCESSKEY; extern std::string G_MQPRODUCER_SECRETKEY; extern std::string TOPIC_STAT; extern std::string TOPIC_PST; extern std::string TOPIC_PLT; extern std::string TOPIC_EVENT; extern std::string TOPIC_ALARM; extern std::string TOPIC_SNG; extern std::string TOPIC_RTDATA; extern std::string G_ROCKETMQ_TAG; extern std::string G_ROCKETMQ_KEY; extern std::string G_ROCKETMQ_CONSUMER; extern std::string G_MQCONSUMER_IPPORT; extern std::string G_MQCONSUMER_ACCESSKEY; extern std::string G_MQCONSUMER_SECRETKEY; extern std::string G_MQCONSUMER_CHANNEL; extern std::string G_MQCONSUMER_TOPIC_RT; extern std::string G_MQCONSUMER_TAG_RT; extern std::string G_MQCONSUMER_KEY_RT; extern std::string G_MQCONSUMER_TOPIC_UD; extern std::string G_MQCONSUMER_TAG_UD; extern std::string G_MQCONSUMER_KEY_UD; extern std::string G_MQCONSUMER_TOPIC_RC; extern std::string G_MQCONSUMER_TAG_RC; extern std::string G_MQCONSUMER_KEY_RC; extern std::string G_MQCONSUMER_TOPIC_SET; extern std::string G_MQCONSUMER_TAG_SET; extern std::string G_MQCONSUMER_KEY_SET; extern std::string G_MQCONSUMER_TOPIC_LOG; extern std::string G_MQCONSUMER_TAG_LOG; extern std::string G_MQCONSUMER_KEY_LOG; extern std::string G_LOG_TOPIC; extern std::string G_LOG_TAG; extern std::string G_LOG_KEY; extern std::string G_ROCKETMQ_TOPIC_TEST; extern std::string G_ROCKETMQ_TAG_TEST; extern std::string G_ROCKETMQ_KEY_TEST; extern std::string Topic_Reply_Topic; extern std::string Topic_Reply_Tag; extern std::string Topic_Reply_Key; extern std::string Heart_Beat_Topic; extern std::string Heart_Beat_Tag; extern std::string Heart_Beat_Key; extern std::string G_CONNECT_TOPIC; extern std::string G_CONNECT_TAG; extern std::string G_CONNECT_KEY; ///////////////////////////////////////////////////////////////////////////////////////////////////// extern void redirectErrorOutput(bool enable); extern void redirectWarnOutput(bool enable); extern void redirectNormalOutput(bool enable); extern void redirectDebugOutput(bool enable); ////////////////////////////////////////////////////////////////////////////////////////////////////// namespace rocketmq { //----------------------------------------------------------------------------- // RocketMQConsumer 类:基于 DefaultMQPushConsumer 的封装 //----------------------------------------------------------------------------- // 回调类型定义:接收单条消息,返回 ConsumeStatus using MessageCallback = std::function; //消费者类 class RocketMQConsumer { public: // 构造函数:初始化消费者 // consumerGroup: 消费者组名称 // nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876") RocketMQConsumer(const std::string& consumerGroup, const std::string& nameServer); // 禁用拷贝和赋值 RocketMQConsumer(const RocketMQConsumer&) = delete; RocketMQConsumer& operator=(const RocketMQConsumer&) = delete; // 订阅主题和标签,并注册回调 // topic: 要订阅的 Topic 名称 // tag: 要过滤的 Tag (可以是单个标签或多个标签的表达式,如 "TagA || TagB") // callback: 收到消息时的处理函数 void subscribe(const std::string& topic, const std::string& tag, MessageCallback callback); // 启动消费者 void start(); // 关闭消费者 void shutdown(); // 析构函数:关闭并销毁消费者 ~RocketMQConsumer(); private: // 内部消息监听器,实现 MessageListenerConcurrently 接口 class InternalListener : public MessageListenerConcurrently { public: InternalListener(RocketMQConsumer* parent) : parent_(parent) {} virtual ~InternalListener() noexcept {} // 收到消息后的回调,实现逐条处理并调用注册的用户回调 virtual ConsumeStatus consumeMessage(const std::vector& msgs) { for (const auto& msg : msgs) { const std::string& topic = msg.getTopic(); const std::string& tags = msg.getTags(); // 生成用于查找回调的键:topic + ":" + tags std::string key = topic + ":" + tags; std::lock_guard lock(parent_->callbackMutex_); auto it = parent_->callbackMap_.find(key); if (it != parent_->callbackMap_.end()) { // 调用用户注册的回调 ConsumeStatus status = it->second(msg); if (status != CONSUME_SUCCESS) { // 如果回调返回需重试,则直接返回 RECONSUME_LATER return RECONSUME_LATER; } } else { // 未找到对应的回调时,打印默认信息并继续 std::cout << "[RocketMQConsumer] No callback registered for " << "topic=" << topic << ", tags=" << tags << ". " << "消息将视为已消费。" << std::endl; } } return CONSUME_SUCCESS; } private: RocketMQConsumer* parent_; }; private: DefaultMQPushConsumer consumer_; // C++ 接口的推模式消费者 std::mutex callbackMutex_; // 保护回调映射的互斥量 std::map callbackMap_; // 主题:标签 -> 回调映射 InternalListener* listener_; // 内部监听器实例 }; // ----------------------------------------------------------------------------- // 订阅信息结构体 struct Subscription { std::string topic; std::string tag; // 回调函数类型:收到单条消息后返回 ConsumeStatus // 假设 myMessageCallbackxxx 已改为符合此签名: // ConsumeStatus myMessageCallbackxxx(const MQMessageExt&); using CallbackT = std::function; CallbackT callback; Subscription(const std::string& t, const std::string& g, CallbackT cb) : topic(t), tag(g), callback(std::move(cb)) {} }; // ----------------------------------------------------------------------------- // ----------------------------------------------------------------------------- // 内部监听器:依据 topic + ":" + tag 分发到对应的回调 class SubscriberListener : public MessageListenerConcurrently { public: // 构造时传入“topic:tag -> 回调”映射 SubscriberListener(const std::map& cbMap) : callbackMap_(cbMap) {} virtual ~SubscriberListener() noexcept = default; // 当批量消息到达时被调用 virtual ConsumeStatus consumeMessage(const std::vector& msgs) override { for (const auto& msg : msgs) { const std::string& topic = msg.getTopic(); const std::string& tags = msg.getTags(); const std::string key = topic + ":" + tags; auto it = callbackMap_.find(key); if (it != callbackMap_.end()) { // 调用注册的回调 ConsumeStatus status = it->second(msg); if (status != CONSUME_SUCCESS) { // 回调要求稍后重试 return RECONSUME_LATER; } } else { // 没有找到对应的回调时,打印日志并视为已消费 std::cout << "[SubscriberListener] No callback for " << "topic=\"" << topic << "\", tag=\"" << tags << "\". " << "默认丢弃并返回成功。" << std::endl; } } return CONSUME_SUCCESS; } private: // “topic:tag” -> 回调函数 std::map callbackMap_; }; // ----------------------------------------------------------------------------- //----------------------------------------------------------------------------- // RocketMQProducer 类:基于 DefaultMQProducer 的封装 //----------------------------------------------------------------------------- // ----------------------------------------------------------------------------- // 队列选择器:轮询选择队列,用于有序发送 class RoundRobinSelector : public MessageQueueSelector { public: RoundRobinSelector() : currentIndex_(0) {} virtual ~RoundRobinSelector() noexcept {} // 从可用队列列表中轮询选择一个队列 virtual MQMessageQueue select(const std::vector& mqs, const MQMessage& msg, void* arg) { if (mqs.empty()) { throw MQClientException("No available message queues", -1, __FILE__, __LINE__); } int idx = currentIndex_++ % static_cast(mqs.size()); return mqs.at(idx); } private: std::atomic currentIndex_; }; // ----------------------------------------------------------------------------- // 生产者类 class RocketMQProducer { public: // 构造函数:初始化生产者 // groupName: 生产者组名称 // nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876") RocketMQProducer(const std::string& groupName, const std::string& nameServer); // 禁用拷贝和赋值 RocketMQProducer(const RocketMQProducer&) = delete; RocketMQProducer& operator=(const RocketMQProducer&) = delete; // 发送消息 (同步发送) // body: 消息体内容 (字符串格式) // topic: Topic 名称 // tags: Tag 字符串 (可选) // keys: Key 字符串 (可选) void sendMessage(const std::string& body, const std::string& topic, const std::string& tags = "", const std::string& keys = ""); // 发送消息 (顺序发送),使用轮询队列选择器 void sendMessageOrderly(const std::string& body, const std::string& topic, const std::string& tags = "", const std::string& keys = ""); // 关闭并销毁生产者 ~RocketMQProducer(); private: DefaultMQProducer producer_; // C++ 接口的生产者 RoundRobinSelector selector_; // 轮询队列选择器 }; } // namespace rocketmq ////////////////////////////////////////////////////////////////////////////////////////////// void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer); rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& msg); rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& msg); rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg); rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg); rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg); void send_heartbeat_to_queue(const std::string& status); void rocketmq_test_300(int mpnum, int front_index, int type,Front* front); void rocketmq_test_rc(Front* front); void rocketmq_test_set(Front* front); void rocketmq_test_ud(Front* front); void rocketmq_test_rt(Front* front); void InitializeProducer(rocketmq::RocketMQProducer*& producer); #endif // _ROCKETMQ_CLIENT_WRAPPER_H_