Files
2025-06-20 16:20:59 +08:00

347 lines
12 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#ifndef _ROCKETMQ_H_
#define _ROCKETMQ_H_
#include <iostream>
#include <string>
#include <map>
#include <vector>
#include <mutex>
#include <functional>
#include <string>
#include <vector>
#include <map>
#include <list>
#include <csignal>
#include <unistd.h>
#include <iostream>
////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "rocketmq/DefaultMQPushConsumer.h"
#include "rocketmq/DefaultMQProducer.h"
#include "rocketmq/MQMessageListener.h"
#include "rocketmq/MQMessageExt.h"
#include "rocketmq/MQMessageQueue.h"
#include "rocketmq/MQSelector.h"
#include "rocketmq/SendResult.h"
#include "rocketmq/SessionCredentials.h"
#include "interface.h"
////////////////////////////////////////////////////////////////////////////////////////////////////////////
class front;
///////////////////////////////////////////////////////////////////////////////////////////////////////////
extern int g_front_seg_index;
extern int G_TEST_NUM;
extern int G_TEST_TYPE;
extern int TEST_PORT;
//备用
extern std::string BROKER_LIST;
//mq
extern std::string G_ROCKETMQ_PRODUCER;
extern std::string G_MQPRODUCER_IPPORT;
extern std::string G_MQPRODUCER_ACCESSKEY;
extern std::string G_MQPRODUCER_SECRETKEY;
extern std::string TOPIC_STAT;
extern std::string TOPIC_PST;
extern std::string TOPIC_PLT;
extern std::string TOPIC_EVENT;
extern std::string TOPIC_ALARM;
extern std::string TOPIC_SNG;
extern std::string TOPIC_RTDATA;
extern std::string G_ROCKETMQ_TAG;
extern std::string G_ROCKETMQ_KEY;
extern std::string G_ROCKETMQ_CONSUMER;
extern std::string G_MQCONSUMER_IPPORT;
extern std::string G_MQCONSUMER_ACCESSKEY;
extern std::string G_MQCONSUMER_SECRETKEY;
extern std::string G_MQCONSUMER_CHANNEL;
extern std::string G_MQCONSUMER_TOPIC_RT;
extern std::string G_MQCONSUMER_TAG_RT;
extern std::string G_MQCONSUMER_KEY_RT;
extern std::string G_MQCONSUMER_TOPIC_UD;
extern std::string G_MQCONSUMER_TAG_UD;
extern std::string G_MQCONSUMER_KEY_UD;
extern std::string G_MQCONSUMER_TOPIC_RC;
extern std::string G_MQCONSUMER_TAG_RC;
extern std::string G_MQCONSUMER_KEY_RC;
extern std::string G_MQCONSUMER_TOPIC_SET;
extern std::string G_MQCONSUMER_TAG_SET;
extern std::string G_MQCONSUMER_KEY_SET;
extern std::string G_MQCONSUMER_TOPIC_LOG;
extern std::string G_MQCONSUMER_TAG_LOG;
extern std::string G_MQCONSUMER_KEY_LOG;
extern std::string G_LOG_TOPIC;
extern std::string G_LOG_TAG;
extern std::string G_LOG_KEY;
extern std::string G_ROCKETMQ_TOPIC_TEST;
extern std::string G_ROCKETMQ_TAG_TEST;
extern std::string G_ROCKETMQ_KEY_TEST;
extern std::string Topic_Reply_Topic;
extern std::string Topic_Reply_Tag;
extern std::string Topic_Reply_Key;
extern std::string Heart_Beat_Topic;
extern std::string Heart_Beat_Tag;
extern std::string Heart_Beat_Key;
extern std::string G_CONNECT_TOPIC;
extern std::string G_CONNECT_TAG;
extern std::string G_CONNECT_KEY;
/////////////////////////////////////////////////////////////////////////////////////////////////////
extern void redirectErrorOutput(bool enable);
extern void redirectWarnOutput(bool enable);
extern void redirectNormalOutput(bool enable);
extern void redirectDebugOutput(bool enable);
//////////////////////////////////////////////////////////////////////////////////////////////////////
namespace rocketmq {
//-----------------------------------------------------------------------------
// RocketMQConsumer 类:基于 DefaultMQPushConsumer 的封装
//-----------------------------------------------------------------------------
// 回调类型定义:接收单条消息,返回 ConsumeStatus
using MessageCallback = std::function<ConsumeStatus(const MQMessageExt&)>;
//消费者类
class RocketMQConsumer {
public:
// 构造函数:初始化消费者
// consumerGroup: 消费者组名称
// nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876")
RocketMQConsumer(const std::string& consumerGroup, const std::string& nameServer);
// 禁用拷贝和赋值
RocketMQConsumer(const RocketMQConsumer&) = delete;
RocketMQConsumer& operator=(const RocketMQConsumer&) = delete;
// 订阅主题和标签,并注册回调
// topic: 要订阅的 Topic 名称
// tag: 要过滤的 Tag (可以是单个标签或多个标签的表达式,如 "TagA || TagB")
// callback: 收到消息时的处理函数
void subscribe(const std::string& topic, const std::string& tag, MessageCallback callback);
// 启动消费者
void start();
// 关闭消费者
void shutdown();
// 析构函数:关闭并销毁消费者
~RocketMQConsumer();
private:
// 内部消息监听器,实现 MessageListenerConcurrently 接口
class InternalListener : public MessageListenerConcurrently {
public:
InternalListener(RocketMQConsumer* parent) : parent_(parent) {}
virtual ~InternalListener() noexcept {}
// 收到消息后的回调,实现逐条处理并调用注册的用户回调
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) {
for (const auto& msg : msgs) {
const std::string& topic = msg.getTopic();
const std::string& tags = msg.getTags();
// 生成用于查找回调的键topic + ":" + tags
std::string key = topic + ":" + tags;
std::lock_guard<std::mutex> lock(parent_->callbackMutex_);
auto it = parent_->callbackMap_.find(key);
if (it != parent_->callbackMap_.end()) {
// 调用用户注册的回调
ConsumeStatus status = it->second(msg);
if (status != CONSUME_SUCCESS) {
// 如果回调返回需重试,则直接返回 RECONSUME_LATER
return RECONSUME_LATER;
}
} else {
// 未找到对应的回调时,打印默认信息并继续
std::cout << "[RocketMQConsumer] No callback registered for "
<< "topic=" << topic << ", tags=" << tags << ". "
<< "消息将视为已消费。" << std::endl;
}
}
return CONSUME_SUCCESS;
}
private:
RocketMQConsumer* parent_;
};
private:
DefaultMQPushConsumer consumer_; // C++ 接口的推模式消费者
std::mutex callbackMutex_; // 保护回调映射的互斥量
std::map<std::string, MessageCallback> callbackMap_; // 主题:标签 -> 回调映射
InternalListener* listener_; // 内部监听器实例
};
// -----------------------------------------------------------------------------
// 订阅信息结构体
struct Subscription {
std::string topic;
std::string tag;
// 回调函数类型:收到单条消息后返回 ConsumeStatus
// 假设 myMessageCallbackxxx 已改为符合此签名:
// ConsumeStatus myMessageCallbackxxx(const MQMessageExt&);
using CallbackT = std::function<ConsumeStatus(const MQMessageExt&)>;
CallbackT callback;
Subscription(const std::string& t, const std::string& g, CallbackT cb)
: topic(t), tag(g), callback(std::move(cb)) {}
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// 内部监听器:依据 topic + ":" + tag 分发到对应的回调
class SubscriberListener : public MessageListenerConcurrently {
public:
// 构造时传入“topic:tag -> 回调”映射
SubscriberListener(const std::map<std::string, Subscription::CallbackT>& cbMap)
: callbackMap_(cbMap) {}
virtual ~SubscriberListener() noexcept = default;
// 当批量消息到达时被调用
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) override {
for (const auto& msg : msgs) {
const std::string& topic = msg.getTopic();
const std::string& tags = msg.getTags();
const std::string key = topic + ":" + tags;
auto it = callbackMap_.find(key);
if (it != callbackMap_.end()) {
// 调用注册的回调
ConsumeStatus status = it->second(msg);
if (status != CONSUME_SUCCESS) {
// 回调要求稍后重试
return RECONSUME_LATER;
}
}
else {
// 没有找到对应的回调时,打印日志并视为已消费
std::cout << "[SubscriberListener] No callback for "
<< "topic=\"" << topic << "\", tag=\"" << tags << "\". "
<< "默认丢弃并返回成功。" << std::endl;
}
}
return CONSUME_SUCCESS;
}
private:
// “topic:tag” -> 回调函数
std::map<std::string, Subscription::CallbackT> callbackMap_;
};
// -----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
// RocketMQProducer 类:基于 DefaultMQProducer 的封装
//-----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// 队列选择器:轮询选择队列,用于有序发送
class RoundRobinSelector : public MessageQueueSelector {
public:
RoundRobinSelector() : currentIndex_(0) {}
virtual ~RoundRobinSelector() noexcept {}
// 从可用队列列表中轮询选择一个队列
virtual MQMessageQueue select(const std::vector<MQMessageQueue>& mqs,
const MQMessage& msg, void* arg) {
if (mqs.empty()) {
throw MQClientException("No available message queues", -1, __FILE__, __LINE__);
}
int idx = currentIndex_++ % static_cast<int>(mqs.size());
return mqs.at(idx);
}
private:
std::atomic<int> currentIndex_;
};
// -----------------------------------------------------------------------------
// 生产者类
class RocketMQProducer {
public:
// 构造函数:初始化生产者
// groupName: 生产者组名称
// nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876")
RocketMQProducer(const std::string& groupName, const std::string& nameServer);
// 禁用拷贝和赋值
RocketMQProducer(const RocketMQProducer&) = delete;
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
// 发送消息 (同步发送)
// body: 消息体内容 (字符串格式)
// topic: Topic 名称
// tags: Tag 字符串 (可选)
// keys: Key 字符串 (可选)
void sendMessage(const std::string& body,
const std::string& topic,
const std::string& tags = "",
const std::string& keys = "");
// 发送消息 (顺序发送),使用轮询队列选择器
void sendMessageOrderly(const std::string& body,
const std::string& topic,
const std::string& tags = "",
const std::string& keys = "");
// 关闭并销毁生产者
~RocketMQProducer();
private:
DefaultMQProducer producer_; // C++ 接口的生产者
RoundRobinSelector selector_; // 轮询队列选择器
};
} // namespace rocketmq
//////////////////////////////////////////////////////////////////////////////////////////////
void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer);
rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg);
void send_heartbeat_to_queue(const std::string& status);
void rocketmq_test_300(int mpnum, int front_index, int type,Front* front);
void rocketmq_test_rc(Front* front);
void rocketmq_test_set(Front* front);
void rocketmq_test_ud(Front* front);
void rocketmq_test_rt(Front* front);
void InitializeProducer(rocketmq::RocketMQProducer*& producer);
#endif // _ROCKETMQ_CLIENT_WRAPPER_H_