add front demo in this project

This commit is contained in:
lnk
2025-06-20 16:20:59 +08:00
parent 768eebbc2b
commit e14e3f9678
208 changed files with 54655 additions and 114 deletions

View File

@@ -0,0 +1,346 @@
#ifndef _ROCKETMQ_H_
#define _ROCKETMQ_H_
#include <iostream>
#include <string>
#include <map>
#include <vector>
#include <mutex>
#include <functional>
#include <string>
#include <vector>
#include <map>
#include <list>
#include <csignal>
#include <unistd.h>
#include <iostream>
////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "rocketmq/DefaultMQPushConsumer.h"
#include "rocketmq/DefaultMQProducer.h"
#include "rocketmq/MQMessageListener.h"
#include "rocketmq/MQMessageExt.h"
#include "rocketmq/MQMessageQueue.h"
#include "rocketmq/MQSelector.h"
#include "rocketmq/SendResult.h"
#include "rocketmq/SessionCredentials.h"
#include "interface.h"
////////////////////////////////////////////////////////////////////////////////////////////////////////////
class front;
///////////////////////////////////////////////////////////////////////////////////////////////////////////
extern int g_front_seg_index;
extern int G_TEST_NUM;
extern int G_TEST_TYPE;
extern int TEST_PORT;
//备用
extern std::string BROKER_LIST;
//mq
extern std::string G_ROCKETMQ_PRODUCER;
extern std::string G_MQPRODUCER_IPPORT;
extern std::string G_MQPRODUCER_ACCESSKEY;
extern std::string G_MQPRODUCER_SECRETKEY;
extern std::string TOPIC_STAT;
extern std::string TOPIC_PST;
extern std::string TOPIC_PLT;
extern std::string TOPIC_EVENT;
extern std::string TOPIC_ALARM;
extern std::string TOPIC_SNG;
extern std::string TOPIC_RTDATA;
extern std::string G_ROCKETMQ_TAG;
extern std::string G_ROCKETMQ_KEY;
extern std::string G_ROCKETMQ_CONSUMER;
extern std::string G_MQCONSUMER_IPPORT;
extern std::string G_MQCONSUMER_ACCESSKEY;
extern std::string G_MQCONSUMER_SECRETKEY;
extern std::string G_MQCONSUMER_CHANNEL;
extern std::string G_MQCONSUMER_TOPIC_RT;
extern std::string G_MQCONSUMER_TAG_RT;
extern std::string G_MQCONSUMER_KEY_RT;
extern std::string G_MQCONSUMER_TOPIC_UD;
extern std::string G_MQCONSUMER_TAG_UD;
extern std::string G_MQCONSUMER_KEY_UD;
extern std::string G_MQCONSUMER_TOPIC_RC;
extern std::string G_MQCONSUMER_TAG_RC;
extern std::string G_MQCONSUMER_KEY_RC;
extern std::string G_MQCONSUMER_TOPIC_SET;
extern std::string G_MQCONSUMER_TAG_SET;
extern std::string G_MQCONSUMER_KEY_SET;
extern std::string G_MQCONSUMER_TOPIC_LOG;
extern std::string G_MQCONSUMER_TAG_LOG;
extern std::string G_MQCONSUMER_KEY_LOG;
extern std::string G_LOG_TOPIC;
extern std::string G_LOG_TAG;
extern std::string G_LOG_KEY;
extern std::string G_ROCKETMQ_TOPIC_TEST;
extern std::string G_ROCKETMQ_TAG_TEST;
extern std::string G_ROCKETMQ_KEY_TEST;
extern std::string Topic_Reply_Topic;
extern std::string Topic_Reply_Tag;
extern std::string Topic_Reply_Key;
extern std::string Heart_Beat_Topic;
extern std::string Heart_Beat_Tag;
extern std::string Heart_Beat_Key;
extern std::string G_CONNECT_TOPIC;
extern std::string G_CONNECT_TAG;
extern std::string G_CONNECT_KEY;
/////////////////////////////////////////////////////////////////////////////////////////////////////
extern void redirectErrorOutput(bool enable);
extern void redirectWarnOutput(bool enable);
extern void redirectNormalOutput(bool enable);
extern void redirectDebugOutput(bool enable);
//////////////////////////////////////////////////////////////////////////////////////////////////////
namespace rocketmq {
//-----------------------------------------------------------------------------
// RocketMQConsumer 类:基于 DefaultMQPushConsumer 的封装
//-----------------------------------------------------------------------------
// 回调类型定义:接收单条消息,返回 ConsumeStatus
using MessageCallback = std::function<ConsumeStatus(const MQMessageExt&)>;
//消费者类
class RocketMQConsumer {
public:
// 构造函数:初始化消费者
// consumerGroup: 消费者组名称
// nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876")
RocketMQConsumer(const std::string& consumerGroup, const std::string& nameServer);
// 禁用拷贝和赋值
RocketMQConsumer(const RocketMQConsumer&) = delete;
RocketMQConsumer& operator=(const RocketMQConsumer&) = delete;
// 订阅主题和标签,并注册回调
// topic: 要订阅的 Topic 名称
// tag: 要过滤的 Tag (可以是单个标签或多个标签的表达式,如 "TagA || TagB")
// callback: 收到消息时的处理函数
void subscribe(const std::string& topic, const std::string& tag, MessageCallback callback);
// 启动消费者
void start();
// 关闭消费者
void shutdown();
// 析构函数:关闭并销毁消费者
~RocketMQConsumer();
private:
// 内部消息监听器,实现 MessageListenerConcurrently 接口
class InternalListener : public MessageListenerConcurrently {
public:
InternalListener(RocketMQConsumer* parent) : parent_(parent) {}
virtual ~InternalListener() noexcept {}
// 收到消息后的回调,实现逐条处理并调用注册的用户回调
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) {
for (const auto& msg : msgs) {
const std::string& topic = msg.getTopic();
const std::string& tags = msg.getTags();
// 生成用于查找回调的键topic + ":" + tags
std::string key = topic + ":" + tags;
std::lock_guard<std::mutex> lock(parent_->callbackMutex_);
auto it = parent_->callbackMap_.find(key);
if (it != parent_->callbackMap_.end()) {
// 调用用户注册的回调
ConsumeStatus status = it->second(msg);
if (status != CONSUME_SUCCESS) {
// 如果回调返回需重试,则直接返回 RECONSUME_LATER
return RECONSUME_LATER;
}
} else {
// 未找到对应的回调时,打印默认信息并继续
std::cout << "[RocketMQConsumer] No callback registered for "
<< "topic=" << topic << ", tags=" << tags << ". "
<< "消息将视为已消费。" << std::endl;
}
}
return CONSUME_SUCCESS;
}
private:
RocketMQConsumer* parent_;
};
private:
DefaultMQPushConsumer consumer_; // C++ 接口的推模式消费者
std::mutex callbackMutex_; // 保护回调映射的互斥量
std::map<std::string, MessageCallback> callbackMap_; // 主题:标签 -> 回调映射
InternalListener* listener_; // 内部监听器实例
};
// -----------------------------------------------------------------------------
// 订阅信息结构体
struct Subscription {
std::string topic;
std::string tag;
// 回调函数类型:收到单条消息后返回 ConsumeStatus
// 假设 myMessageCallbackxxx 已改为符合此签名:
// ConsumeStatus myMessageCallbackxxx(const MQMessageExt&);
using CallbackT = std::function<ConsumeStatus(const MQMessageExt&)>;
CallbackT callback;
Subscription(const std::string& t, const std::string& g, CallbackT cb)
: topic(t), tag(g), callback(std::move(cb)) {}
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// 内部监听器:依据 topic + ":" + tag 分发到对应的回调
class SubscriberListener : public MessageListenerConcurrently {
public:
// 构造时传入“topic:tag -> 回调”映射
SubscriberListener(const std::map<std::string, Subscription::CallbackT>& cbMap)
: callbackMap_(cbMap) {}
virtual ~SubscriberListener() noexcept = default;
// 当批量消息到达时被调用
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) override {
for (const auto& msg : msgs) {
const std::string& topic = msg.getTopic();
const std::string& tags = msg.getTags();
const std::string key = topic + ":" + tags;
auto it = callbackMap_.find(key);
if (it != callbackMap_.end()) {
// 调用注册的回调
ConsumeStatus status = it->second(msg);
if (status != CONSUME_SUCCESS) {
// 回调要求稍后重试
return RECONSUME_LATER;
}
}
else {
// 没有找到对应的回调时,打印日志并视为已消费
std::cout << "[SubscriberListener] No callback for "
<< "topic=\"" << topic << "\", tag=\"" << tags << "\". "
<< "默认丢弃并返回成功。" << std::endl;
}
}
return CONSUME_SUCCESS;
}
private:
// “topic:tag” -> 回调函数
std::map<std::string, Subscription::CallbackT> callbackMap_;
};
// -----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
// RocketMQProducer 类:基于 DefaultMQProducer 的封装
//-----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// 队列选择器:轮询选择队列,用于有序发送
class RoundRobinSelector : public MessageQueueSelector {
public:
RoundRobinSelector() : currentIndex_(0) {}
virtual ~RoundRobinSelector() noexcept {}
// 从可用队列列表中轮询选择一个队列
virtual MQMessageQueue select(const std::vector<MQMessageQueue>& mqs,
const MQMessage& msg, void* arg) {
if (mqs.empty()) {
throw MQClientException("No available message queues", -1, __FILE__, __LINE__);
}
int idx = currentIndex_++ % static_cast<int>(mqs.size());
return mqs.at(idx);
}
private:
std::atomic<int> currentIndex_;
};
// -----------------------------------------------------------------------------
// 生产者类
class RocketMQProducer {
public:
// 构造函数:初始化生产者
// groupName: 生产者组名称
// nameServer: NameServer 地址 (格式示例 "127.0.0.1:9876")
RocketMQProducer(const std::string& groupName, const std::string& nameServer);
// 禁用拷贝和赋值
RocketMQProducer(const RocketMQProducer&) = delete;
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
// 发送消息 (同步发送)
// body: 消息体内容 (字符串格式)
// topic: Topic 名称
// tags: Tag 字符串 (可选)
// keys: Key 字符串 (可选)
void sendMessage(const std::string& body,
const std::string& topic,
const std::string& tags = "",
const std::string& keys = "");
// 发送消息 (顺序发送),使用轮询队列选择器
void sendMessageOrderly(const std::string& body,
const std::string& topic,
const std::string& tags = "",
const std::string& keys = "");
// 关闭并销毁生产者
~RocketMQProducer();
private:
DefaultMQProducer producer_; // C++ 接口的生产者
RoundRobinSelector selector_; // 轮询队列选择器
};
} // namespace rocketmq
//////////////////////////////////////////////////////////////////////////////////////////////
void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer);
rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg);
rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg);
void send_heartbeat_to_queue(const std::string& status);
void rocketmq_test_300(int mpnum, int front_index, int type,Front* front);
void rocketmq_test_rc(Front* front);
void rocketmq_test_set(Front* front);
void rocketmq_test_ud(Front* front);
void rocketmq_test_rt(Front* front);
void InitializeProducer(rocketmq::RocketMQProducer*& producer);
#endif // _ROCKETMQ_CLIENT_WRAPPER_H_