351 lines
12 KiB
C++
351 lines
12 KiB
C++
#ifndef _ROCKETMQ_H_
|
||
#define _ROCKETMQ_H_
|
||
|
||
#include <iostream>
|
||
#include <string>
|
||
#include <map>
|
||
#include <vector>
|
||
#include <mutex>
|
||
#include <functional>
|
||
#include <string>
|
||
#include <vector>
|
||
#include <map>
|
||
#include <list>
|
||
#include <csignal>
|
||
#include <unistd.h>
|
||
#include <iostream>
|
||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
#include "rocketmq/DefaultMQPushConsumer.h"
|
||
#include "rocketmq/DefaultMQProducer.h"
|
||
#include "rocketmq/MQMessageListener.h"
|
||
#include "rocketmq/MQMessageExt.h"
|
||
#include "rocketmq/MQMessageQueue.h"
|
||
#include "rocketmq/MQSelector.h"
|
||
#include "rocketmq/SendResult.h"
|
||
#include "rocketmq/SessionCredentials.h"
|
||
|
||
#include "interface.h"
|
||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
class front;
|
||
|
||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
extern int g_front_seg_index;
|
||
|
||
extern int G_TEST_NUM;
|
||
extern int G_TEST_TYPE;
|
||
extern int TEST_PORT;
|
||
|
||
//备用
|
||
extern std::string BROKER_LIST;
|
||
|
||
//mq
|
||
extern std::string G_ROCKETMQ_PRODUCER;
|
||
extern std::string G_MQPRODUCER_IPPORT;
|
||
extern std::string G_MQPRODUCER_ACCESSKEY;
|
||
extern std::string G_MQPRODUCER_SECRETKEY;
|
||
|
||
extern std::string TOPIC_STAT;
|
||
extern std::string TOPIC_PST;
|
||
extern std::string TOPIC_PLT;
|
||
extern std::string TOPIC_EVENT;
|
||
extern std::string TOPIC_ALARM;
|
||
extern std::string TOPIC_SNG;
|
||
extern std::string TOPIC_RTDATA;
|
||
|
||
extern std::string G_ROCKETMQ_TAG;
|
||
extern std::string G_ROCKETMQ_KEY;
|
||
|
||
extern std::string G_ROCKETMQ_CONSUMER;
|
||
extern std::string G_MQCONSUMER_IPPORT;
|
||
extern std::string G_MQCONSUMER_ACCESSKEY;
|
||
extern std::string G_MQCONSUMER_SECRETKEY;
|
||
extern std::string G_MQCONSUMER_CHANNEL;
|
||
|
||
extern std::string G_MQCONSUMER_TOPIC_RT;
|
||
extern std::string G_MQCONSUMER_TAG_RT;
|
||
extern std::string G_MQCONSUMER_KEY_RT;
|
||
|
||
extern std::string G_MQCONSUMER_TOPIC_UD;
|
||
extern std::string G_MQCONSUMER_TAG_UD;
|
||
extern std::string G_MQCONSUMER_KEY_UD;
|
||
|
||
extern std::string G_MQCONSUMER_TOPIC_RC;
|
||
extern std::string G_MQCONSUMER_TAG_RC;
|
||
extern std::string G_MQCONSUMER_KEY_RC;
|
||
|
||
extern std::string G_MQCONSUMER_TOPIC_SET;
|
||
extern std::string G_MQCONSUMER_TAG_SET;
|
||
extern std::string G_MQCONSUMER_KEY_SET;
|
||
|
||
extern std::string G_MQCONSUMER_TOPIC_LOG;
|
||
extern std::string G_MQCONSUMER_TAG_LOG;
|
||
extern std::string G_MQCONSUMER_KEY_LOG;
|
||
|
||
extern std::string G_MQCONSUMER_TOPIC_CLOUD;
|
||
extern std::string G_MQCONSUMER_TAG_CLOUD;
|
||
extern std::string G_MQCONSUMER_KEY_CLOUD;
|
||
|
||
extern std::string G_LOG_TOPIC;
|
||
extern std::string G_LOG_TAG;
|
||
extern std::string G_LOG_KEY;
|
||
|
||
extern std::string G_ROCKETMQ_TOPIC_TEST;
|
||
extern std::string G_ROCKETMQ_TAG_TEST;
|
||
extern std::string G_ROCKETMQ_KEY_TEST;
|
||
|
||
extern std::string Topic_Reply_Topic;
|
||
extern std::string Topic_Reply_Tag;
|
||
extern std::string Topic_Reply_Key;
|
||
|
||
extern std::string Heart_Beat_Topic;
|
||
extern std::string Heart_Beat_Tag;
|
||
extern std::string Heart_Beat_Key;
|
||
|
||
extern std::string G_CONNECT_TOPIC;
|
||
extern std::string G_CONNECT_TAG;
|
||
extern std::string G_CONNECT_KEY;
|
||
|
||
/////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
extern void redirectErrorOutput(bool enable);
|
||
extern void redirectWarnOutput(bool enable);
|
||
extern void redirectNormalOutput(bool enable);
|
||
extern void redirectDebugOutput(bool enable);
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
namespace rocketmq {
|
||
|
||
//-----------------------------------------------------------------------------
|
||
// RocketMQConsumer 类:基于 DefaultMQPushConsumer 的封装
|
||
//-----------------------------------------------------------------------------
|
||
|
||
// 回调类型定义:接收单条消息,返回 ConsumeStatus
|
||
using MessageCallback = std::function<ConsumeStatus(const MQMessageExt&)>;
|
||
|
||
//消费者类
|
||
class RocketMQConsumer {
|
||
public:
|
||
// 构造函数:初始化消费者
|
||
// consumerGroup: 消费者组名称
|
||
// nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876")
|
||
RocketMQConsumer(const std::string& consumerGroup, const std::string& nameServer);
|
||
|
||
// 禁用拷贝和赋值
|
||
RocketMQConsumer(const RocketMQConsumer&) = delete;
|
||
RocketMQConsumer& operator=(const RocketMQConsumer&) = delete;
|
||
|
||
// 订阅主题和标签,并注册回调
|
||
// topic: 要订阅的 Topic 名称
|
||
// tag: 要过滤的 Tag (可以是单个标签或多个标签的表达式,如 "TagA || TagB")
|
||
// callback: 收到消息时的处理函数
|
||
void subscribe(const std::string& topic, const std::string& tag, MessageCallback callback);
|
||
|
||
// 启动消费者
|
||
void start();
|
||
|
||
// 关闭消费者
|
||
void shutdown();
|
||
|
||
// 析构函数:关闭并销毁消费者
|
||
~RocketMQConsumer();
|
||
|
||
private:
|
||
// 内部消息监听器,实现 MessageListenerConcurrently 接口
|
||
class InternalListener : public MessageListenerConcurrently {
|
||
public:
|
||
InternalListener(RocketMQConsumer* parent) : parent_(parent) {}
|
||
virtual ~InternalListener() noexcept {}
|
||
|
||
// 收到消息后的回调,实现逐条处理并调用注册的用户回调
|
||
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) {
|
||
for (const auto& msg : msgs) {
|
||
const std::string& topic = msg.getTopic();
|
||
const std::string& tags = msg.getTags();
|
||
|
||
// 生成用于查找回调的键:topic + ":" + tags
|
||
std::string key = topic + ":" + tags;
|
||
|
||
std::lock_guard<std::mutex> lock(parent_->callbackMutex_);
|
||
auto it = parent_->callbackMap_.find(key);
|
||
if (it != parent_->callbackMap_.end()) {
|
||
// 调用用户注册的回调
|
||
ConsumeStatus status = it->second(msg);
|
||
if (status != CONSUME_SUCCESS) {
|
||
// 如果回调返回需重试,则直接返回 RECONSUME_LATER
|
||
return RECONSUME_LATER;
|
||
}
|
||
} else {
|
||
// 未找到对应的回调时,打印默认信息并继续
|
||
std::cout << "[RocketMQConsumer] No callback registered for "
|
||
<< "topic=" << topic << ", tags=" << tags << ". "
|
||
<< "消息将视为已消费。" << std::endl;
|
||
}
|
||
}
|
||
return CONSUME_SUCCESS;
|
||
}
|
||
|
||
private:
|
||
RocketMQConsumer* parent_;
|
||
};
|
||
|
||
private:
|
||
DefaultMQPushConsumer consumer_; // C++ 接口的推模式消费者
|
||
std::mutex callbackMutex_; // 保护回调映射的互斥量
|
||
std::map<std::string, MessageCallback> callbackMap_; // 主题:标签 -> 回调映射
|
||
InternalListener* listener_; // 内部监听器实例
|
||
};
|
||
|
||
// -----------------------------------------------------------------------------
|
||
// 订阅信息结构体
|
||
struct Subscription {
|
||
std::string topic;
|
||
std::string tag;
|
||
// 回调函数类型:收到单条消息后返回 ConsumeStatus
|
||
// 假设 myMessageCallbackxxx 已改为符合此签名:
|
||
// ConsumeStatus myMessageCallbackxxx(const MQMessageExt&);
|
||
using CallbackT = std::function<ConsumeStatus(const MQMessageExt&)>;
|
||
CallbackT callback;
|
||
|
||
Subscription(const std::string& t, const std::string& g, CallbackT cb)
|
||
: topic(t), tag(g), callback(std::move(cb)) {}
|
||
};
|
||
// -----------------------------------------------------------------------------
|
||
|
||
// -----------------------------------------------------------------------------
|
||
// 内部监听器:依据 topic + ":" + tag 分发到对应的回调
|
||
class SubscriberListener : public MessageListenerConcurrently {
|
||
public:
|
||
// 构造时传入“topic:tag -> 回调”映射
|
||
SubscriberListener(const std::map<std::string, Subscription::CallbackT>& cbMap)
|
||
: callbackMap_(cbMap) {}
|
||
|
||
virtual ~SubscriberListener() noexcept = default;
|
||
|
||
// 当批量消息到达时被调用
|
||
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) override {
|
||
for (const auto& msg : msgs) {
|
||
const std::string& topic = msg.getTopic();
|
||
const std::string& tags = msg.getTags();
|
||
const std::string key = topic + ":" + tags;
|
||
|
||
auto it = callbackMap_.find(key);
|
||
if (it != callbackMap_.end()) {
|
||
// 调用注册的回调
|
||
ConsumeStatus status = it->second(msg);
|
||
if (status != CONSUME_SUCCESS) {
|
||
// 回调要求稍后重试
|
||
return RECONSUME_LATER;
|
||
}
|
||
}
|
||
else {
|
||
// 没有找到对应的回调时,打印日志并视为已消费
|
||
std::cout << "[SubscriberListener] No callback for "
|
||
<< "topic=\"" << topic << "\", tag=\"" << tags << "\". "
|
||
<< "默认丢弃并返回成功。" << std::endl;
|
||
}
|
||
}
|
||
return CONSUME_SUCCESS;
|
||
}
|
||
|
||
private:
|
||
// “topic:tag” -> 回调函数
|
||
std::map<std::string, Subscription::CallbackT> callbackMap_;
|
||
};
|
||
// -----------------------------------------------------------------------------
|
||
|
||
//-----------------------------------------------------------------------------
|
||
// RocketMQProducer 类:基于 DefaultMQProducer 的封装
|
||
//-----------------------------------------------------------------------------
|
||
|
||
// -----------------------------------------------------------------------------
|
||
// 队列选择器:轮询选择队列,用于有序发送
|
||
class RoundRobinSelector : public MessageQueueSelector {
|
||
public:
|
||
RoundRobinSelector() : currentIndex_(0) {}
|
||
virtual ~RoundRobinSelector() noexcept {}
|
||
|
||
// 从可用队列列表中轮询选择一个队列
|
||
virtual MQMessageQueue select(const std::vector<MQMessageQueue>& mqs,
|
||
const MQMessage& msg, void* arg) {
|
||
if (mqs.empty()) {
|
||
throw MQClientException("No available message queues", -1, __FILE__, __LINE__);
|
||
}
|
||
int idx = currentIndex_++ % static_cast<int>(mqs.size());
|
||
return mqs.at(idx);
|
||
}
|
||
|
||
private:
|
||
std::atomic<int> currentIndex_;
|
||
};
|
||
// -----------------------------------------------------------------------------
|
||
|
||
// 生产者类
|
||
class RocketMQProducer {
|
||
public:
|
||
// 构造函数:初始化生产者
|
||
// groupName: 生产者组名称
|
||
// nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876")
|
||
RocketMQProducer(const std::string& groupName, const std::string& nameServer);
|
||
|
||
// 禁用拷贝和赋值
|
||
RocketMQProducer(const RocketMQProducer&) = delete;
|
||
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
|
||
|
||
// 发送消息 (同步发送)
|
||
// body: 消息体内容 (字符串格式)
|
||
// topic: Topic 名称
|
||
// tags: Tag 字符串 (可选)
|
||
// keys: Key 字符串 (可选)
|
||
void sendMessage(const std::string& body,
|
||
const std::string& topic,
|
||
const std::string& tags = "",
|
||
const std::string& keys = "");
|
||
|
||
// 发送消息 (顺序发送),使用轮询队列选择器
|
||
void sendMessageOrderly(const std::string& body,
|
||
const std::string& topic,
|
||
const std::string& tags = "",
|
||
const std::string& keys = "");
|
||
|
||
// 关闭并销毁生产者
|
||
~RocketMQProducer();
|
||
|
||
private:
|
||
DefaultMQProducer producer_; // C++ 接口的生产者
|
||
RoundRobinSelector selector_; // 轮询队列选择器
|
||
};
|
||
|
||
} // namespace rocketmq
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer);
|
||
|
||
rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& msg);
|
||
rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& msg);
|
||
rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg);
|
||
rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg);
|
||
rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg);
|
||
|
||
void send_heartbeat_to_queue(const std::string& status);
|
||
|
||
void rocketmq_test_300(int mpnum, int front_index, int type,Front* front);
|
||
void rocketmq_test_rc(Front* front);
|
||
void rocketmq_test_set(Front* front);
|
||
void rocketmq_test_ud(Front* front);
|
||
void rocketmq_test_rt(Front* front);
|
||
void rocketmq_test_getdir(Front* front);
|
||
void InitializeProducer(rocketmq::RocketMQProducer*& producer);
|
||
|
||
#endif // _ROCKETMQ_CLIENT_WRAPPER_H_
|
||
|
||
|
||
|
||
|