#include // 用于 std::ifstream #include // 用于 std::stringstream #include #include #include #include #include "../mms/db_interface.h" #include "../rocketmq/CProducer.h" #include "../rocketmq/CMessage.h" #include "../rocketmq/CSendResult.h" #include "../rocketmq/SimpleProducer.h" //测试300数据用lnk20241202 #include #include //测试300数据用 //lnk20241209添加队列选择 #include "../rocketmq/MQSelector.h" #include "../rocketmq/MQMessageQueue.h" //#include #include #include //lnk20241209添加队列选择 #include //引入消费起点 #include "../rocketmq/DefaultMQPushConsumer.h" #include "../rocketmq/ConsumeType.h" // 引入提供的消费者接口头文件 #include "../rocketmq/CPushConsumer.h" #include "../rocketmq/CCommon.h" #include "../rocketmq/CMessageExt.h" #include #include // 用于互斥锁(在 C++98 中没有 std::mutex) #include // for std::pair using namespace std; extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port extern std::string G_ROCKETMQ_TOPIC;//topie extern std::string G_ROCKETMQ_TAG;//tag extern std::string G_ROCKETMQ_KEY;//key extern std::string FRONT_INST; #ifdef __cplusplus extern "C" { #endif extern std::string G_MQCONSUMER_TOPIC_SET; // C++ 中的全局变量声明 #ifdef __cplusplus } #endif extern int QUEUENUM; extern int g_front_seg_index; extern char subdir[128]; //////////////////////////////////////////////////////////////////////////////////////////////////////////// //消费者连接秘钥 extern std::string G_MQCONSUMER_ACCESSKEY; extern std::string G_MQCONSUMER_SECRETKEY; extern std::string G_MQCONSUMER_CHANNEL; // 前向声明 RocketMQConsumer 类 class RocketMQConsumer; // 全局映射:CPushConsumer* -> RocketMQConsumer* std::map g_consumerMap;// pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER; class RocketMQConsumer { public: // 构造函数:初始化消费者并启动 RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId); // 禁用拷贝和赋值 RocketMQConsumer(const RocketMQConsumer&) {} RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; } // 订阅主题和标签,并注册回调 void subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback); // 启动消费者 void start(); //修改消费模式 void setConsumerMessageModel(const std::string& topic); // 析构函数:关闭并销毁消费者 ~RocketMQConsumer(); private: CPushConsumer* consumer_; // C 接口消费者指针 //MessageCallBack messageCallback_; // 函数指针用于回调 std::map, MessageCallBack> callbacks_; // 订阅到回调的映射 // 静态消息处理回调 static int messageHandler(CPushConsumer* consumer, CMessageExt* msg); // 实例消息处理函数 int handleMessage(CMessageExt* msg); }; // 构造函数实现 RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId) : consumer_(NULL)//, messageCallback_(NULL) { // 创建消费者 consumer_ = CreatePushConsumer(consumerName.c_str()); if (consumer_ == NULL) { std::cout << "error CreatePushConsumer"<< std::endl; throw std::runtime_error("Failed to create push consumer."); } SetPushConsumerSessionCredentials(consumer_,G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(),G_MQCONSUMER_CHANNEL.c_str()); // 设置 NameServer 地址 if (SetPushConsumerNameServerAddress(consumer_, nameServer.c_str()) != 0) { DestroyPushConsumer(consumer_); std::cout << "error setting nameServer"<< std::endl; throw std::runtime_error("Failed to set NameServer address."); } // 设置消费者组ID if (SetPushConsumerGroupID(consumer_, groupId.c_str()) != 0) { DestroyPushConsumer(consumer_); std::cout << "error setting groupId"<< std::endl; throw std::runtime_error("Failed to set Consumer Group ID."); } //调试用 std::string consumerlog = "./mqconsumer/" + consumerName +".log"; if ( (SetPushConsumerLogPath(consumer_,consumerlog.c_str()) || SetPushConsumerLogFileNumAndSize(consumer_,10,100) || SetPushConsumerLogLevel(consumer_,E_LOG_LEVEL_DEBUG) ) != 0) {//记录消费日志 DestroyPushConsumer(consumer_); std::cout << "error setting logpath"<< std::endl; } std::cout << "logpath:" << consumerlog << std::endl; // 注册消息回调 if (RegisterMessageCallback(consumer_, RocketMQConsumer::messageHandler) != 0) { DestroyPushConsumer(consumer_); std::cout << "error setting Callback"<< std::endl; throw std::runtime_error("Failed to register message callback."); } // 将消费者实例添加到全局映射 pthread_mutex_lock(&g_consumerMapMutex); g_consumerMap[consumer_] = this; pthread_mutex_unlock(&g_consumerMapMutex); std::cout << "RocketMQ Consumer initialized and started." << std::endl; } // 启动消费者 void RocketMQConsumer::start() { if (StartPushConsumer(consumer_) != 0) { pthread_mutex_lock(&g_consumerMapMutex); g_consumerMap.erase(consumer_); pthread_mutex_unlock(&g_consumerMapMutex); DestroyPushConsumer(consumer_); throw std::runtime_error("Failed to start push consumer."); } else{ std::cout << "RocketMQ Consumer started." << std::endl; } } void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback) { if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) { throw std::runtime_error("Failed to subscribe to topic/tag."); } std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl; // 使用 std::pair 作为键 std::pair key(topic, tag); callbacks_[key] = callback; } // 静态消息处理回调实现 int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg) { RocketMQConsumer* instance = NULL; //调试用 std::cout << "messagehandler" << std::endl; // 查找对应的消费者实例 pthread_mutex_lock(&g_consumerMapMutex); std::map::iterator it = g_consumerMap.find(consumer); if (it != g_consumerMap.end()) { instance = it->second; } pthread_mutex_unlock(&g_consumerMapMutex); if (instance) { return instance->handleMessage(msg); } else { std::cerr << "Consumer instance not found for callback." << std::endl; return E_RECONSUME_LATER; // 默认返回重试状态 } } int RocketMQConsumer::handleMessage(CMessageExt* msg) { // 检查 msg 和 consumer_ 是否为 NULL if (!msg || !consumer_) { std::cerr << "Received null message or consumer." << std::endl; return E_RECONSUME_LATER; } // 获取消息的主题和标签 std::string topic = GetMessageTopic(msg); // 假设存在此函数 std::string tag = GetMessageTags(msg); // 假设存在此函数 // 打印调试信息 std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl; // 使用 std::pair 作为键 std::pair key(topic, tag); // 查找对应的回调函数 std::map, MessageCallBack>::iterator it = callbacks_.find(key); if (it != callbacks_.end()) { // 调用对应的回调函数 //调试 std::cout << "callback Handling message " <second(consumer_, msg); } else { //调试 std::cout << "there is no callback " <& subscriptions) // 接收多个订阅 { if (g_consumer == NULL) { std::cout << "create new consumer!" << std::endl; try { g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组,不同进程(不同的消费者)同时消费topic的同一条消息 for (size_t i = 0; i < subscriptions.size(); ++i) { g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式 g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag, subscriptions[i].callback); } g_consumer->start(); } catch (const std::exception& e) { std::cerr << "Failed to initialize consumer: " << e.what() << std::endl; throw; // 重新抛出异常 } } } // 关闭并销毁消费者函数 void ShutdownAndDestroyConsumer() { if (g_consumer != NULL) { delete g_consumer; g_consumer = NULL; } } // 消费消息的接口函数 void rocketmq_consumer_receive( const std::string& consumerName, const std::string& nameServer, const std::vector& subscriptions) // 接收多个订阅 { if (g_consumer == NULL) { try { //InitializeConsumer(consumerName, nameServer, topic, tag, callback);//初始化后,mq库内部来完成消息的获取 InitializeConsumer(consumerName, nameServer, subscriptions); // 初始化后,MQ库内部开始获取消息 } catch (...) { std::cerr << "Cannot consume message because consumer initialization failed." << std::endl; return; } } } ///////////////////////////////////////////////////////////////////////////////////////////////////////////// //封装生产者类 #if 1 // 全局或静态变量,用于维护当前队列索引 static int currentQueueId = 0; // 队列选择器回调函数:轮询选择队列 ID int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) { if (queueNum == 0) { throw std::runtime_error("No available queues"); } int queueId = currentQueueId % queueNum; currentQueueId++; if (currentQueueId >= 1024 - queueNum) { currentQueueId = 0; } return queueId; } // 封装生产者的类 class RocketMQProducer { public: RocketMQProducer(const std::string& producerName, const std::string& nameServer) : producer_(NULL) { // 创建生产者 producer_ = CreateProducer(producerName.c_str()); if (producer_ == NULL) { throw std::runtime_error("Failed to create producer."); } // 设置 nameserver 地址 SetProducerNameServerAddress(producer_, nameServer.c_str()); SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), ""); // 启动生产者 StartProducer(producer_); std::cout << "rocketmq_Producer initialized and started." << std::endl; } // 禁用拷贝和赋值 RocketMQProducer(const RocketMQProducer&) = delete; RocketMQProducer& operator=(const RocketMQProducer&) = delete; void printSendResult(const CSendResult& result) { std::cout << "SendResult:" << std::endl; std::cout << " Status: "; switch (result.sendStatus) { case E_SEND_OK: std::cout << "E_SEND_OK"; break; case E_SEND_FLUSH_DISK_TIMEOUT: std::cout << "E_SEND_FLUSH_DISK_TIMEOUT"; break; case E_SEND_FLUSH_SLAVE_TIMEOUT: std::cout << "E_SEND_FLUSH_SLAVE_TIMEOUT"; break; case E_SEND_SLAVE_NOT_AVAILABLE: std::cout << "E_SEND_SLAVE_NOT_AVAILABLE"; break; default: std::cout << "UNKNOWN(" << result.sendStatus << ")"; break; } std::cout << std::endl; std::cout << " MsgID : " << result.msgId << std::endl; std::cout << " Offset: " << result.offset << std::endl; } // 发送消息 void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) { CSendResult result; CMessage* msg = NULL; try { // 创建消息并设置属性 msg = CreateMessage(topic); if (msg == NULL) { throw std::runtime_error("Failed to create message."); } SetMessageTags(msg, tags.c_str()); SetMessageKeys(msg, keys.c_str()); SetMessageBody(msg, strbody); // 假设队列数量和 Broker 名称是固定的 int queueNum = QUEUENUM; // 配置的队列数量,例如5 // 发送消息 int sendResult = SendMessageOnewayOrderly( producer_, msg, RoundRobinSelector, // 队列选择器回调函数 &queueNum // 传递给选择器的额外参数(队列数量) ); if (sendResult == 0) { // 假设返回 0 表示成功 std::cout << "Message sent successfully.topic:" << topic <sendMessage(strbody, topic, tags, keys); } catch (const std::exception& e) { std::cerr << "Failed to send message: " << e.what() << std::endl; // 处理发送失败的情况,例如记录日志或重试 } } #endif ////////////////////////////////////////////////////////////////////////////////////////////////////////// // producer_send0测试用 void StartSendMessage(CProducer* producer) { CSendResult result; // create message and set some values for it CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); for (int i = 0; i < 10; i++) { // construct different body string strMessageBody = "this is body number"; SetMessageBody(msg, strMessageBody.c_str()); // send message SendMessageSync(producer, msg, &result); cout << "send message[" << i << "], result status:" << result.sendStatus << ", msgBody:" << strMessageBody << endl; usleep(1000); } // destroy message DestroyMessage(msg); } //producer_send 测试用 void StartSendMessage(CProducer* producer,const char* strbody) { CSendResult result; // create message and set some values for it CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); SetMessageBody(msg, strbody); // send message SendMessageSync(producer, msg, &result); cout << "result status:" << result.sendStatus << ", msgBody:" << strbody << endl; // destroy message DestroyMessage(msg); } //测试用 固定消息体 void producer_send0() { cout << "Producer Initializing....." << endl; // create producer and set some values for it CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str()); SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str()); // start producer StartProducer(producer); cout << "Producer start....." << endl; // send message StartSendMessage(producer); // shutdown producer ShutdownProducer(producer); // destroy producer DestroyProducer(producer); cout << "Producer Shutdown!" << endl; } //测试用 可控制消息体 void producer_send(const char* strbody) { cout << "Producer Initializing....." << endl; // create producer and set some values for it CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str()); SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str()); // start producer StartProducer(producer); cout << "Producer start....." << endl; // send message StartSendMessage(producer, strbody); // shutdown producer ShutdownProducer(producer); // destroy producer DestroyProducer(producer); cout << "Producer Shutdown!" << endl; } /////////////////////////////////////////////////////////////////////////////////////////////////////////// extern "C" { extern std::string G_MQCONSUMER_TOPIC_RT; void rocketmq_test_rt() { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT); std::ifstream file("rt.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } extern std::string G_MQCONSUMER_TOPIC_UD; void rocketmq_test_ud()//用来测试台账更新 { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD); std::ifstream file("ud.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } void rocketmq_test_set()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); std::ifstream file("set.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } void rocketmq_test_only()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); std::ifstream file("set_debug.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } extern std::string G_MQCONSUMER_TOPIC_RC; void rocketmq_test_rc() { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC); std::ifstream file("rc.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } extern std::string G_MQCONSUMER_TOPIC_LOG; void rocketmq_test_log() { Ckafka_data_t data; data.monitor_id = 123123; data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG); std::ifstream file("log_test.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = QString::fromStdString(buffer.str()); data.mp_id = 123123; my_rocketmq_send(data); } }