2025-01-16 16:17:01 +08:00
|
|
|
|
#include <fstream> // 用于 std::ifstream
|
|
|
|
|
|
#include <sstream> // 用于 std::stringstream
|
|
|
|
|
|
#include <unistd.h>
|
|
|
|
|
|
#include <stdlib.h>
|
|
|
|
|
|
#include <iostream>
|
|
|
|
|
|
#include <string>
|
|
|
|
|
|
#include "../mms/db_interface.h"
|
2025-02-18 17:10:22 +08:00
|
|
|
|
|
2025-05-09 16:53:07 +08:00
|
|
|
|
#include "../rocketmq/CProducer.h"
|
|
|
|
|
|
#include "../rocketmq/CMessage.h"
|
|
|
|
|
|
#include "../rocketmq/CSendResult.h"
|
|
|
|
|
|
#include "../rocketmq/SimpleProducer.h"
|
2025-01-16 16:17:01 +08:00
|
|
|
|
|
|
|
|
|
|
//测试300数据用lnk20241202
|
|
|
|
|
|
#include <ctime>
|
|
|
|
|
|
#include <QThread>
|
|
|
|
|
|
//测试300数据用
|
|
|
|
|
|
//lnk20241209添加队列选择
|
2025-05-09 16:53:07 +08:00
|
|
|
|
#include "../rocketmq/MQSelector.h"
|
|
|
|
|
|
#include "../rocketmq/MQMessageQueue.h"
|
2025-01-16 16:17:01 +08:00
|
|
|
|
//#include <atomic>
|
|
|
|
|
|
#include <vector>
|
|
|
|
|
|
#include <stdexcept>
|
|
|
|
|
|
//lnk20241209添加队列选择
|
|
|
|
|
|
#include <cstring>
|
|
|
|
|
|
|
2025-03-26 10:51:18 +08:00
|
|
|
|
//引入消费起点
|
2025-05-09 16:53:07 +08:00
|
|
|
|
#include "../rocketmq/DefaultMQPushConsumer.h"
|
|
|
|
|
|
#include "../rocketmq/ConsumeType.h"
|
2025-03-26 10:51:18 +08:00
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
// 引入提供的消费者接口头文件
|
2025-05-09 16:53:07 +08:00
|
|
|
|
#include "../rocketmq/CPushConsumer.h"
|
|
|
|
|
|
#include "../rocketmq/CCommon.h"
|
|
|
|
|
|
#include "../rocketmq/CMessageExt.h"
|
2025-01-16 16:17:01 +08:00
|
|
|
|
#include <map>
|
|
|
|
|
|
#include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex)
|
|
|
|
|
|
#include <utility> // for std::pair
|
|
|
|
|
|
|
2025-05-30 15:40:20 +08:00
|
|
|
|
#include "../log4cplus/log4.h"//lnk添加log4
|
|
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
using namespace std;
|
|
|
|
|
|
|
|
|
|
|
|
extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer
|
|
|
|
|
|
extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port
|
|
|
|
|
|
extern std::string G_ROCKETMQ_TOPIC;//topie
|
|
|
|
|
|
extern std::string G_ROCKETMQ_TAG;//tag
|
|
|
|
|
|
extern std::string G_ROCKETMQ_KEY;//key
|
|
|
|
|
|
|
2025-05-12 16:43:42 +08:00
|
|
|
|
extern std::string FRONT_INST;
|
|
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
#ifdef __cplusplus
|
|
|
|
|
|
extern "C" {
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
extern std::string G_MQCONSUMER_TOPIC_SET; // C++ 中的全局变量声明
|
|
|
|
|
|
|
|
|
|
|
|
#ifdef __cplusplus
|
|
|
|
|
|
}
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
extern int QUEUENUM;
|
|
|
|
|
|
|
|
|
|
|
|
extern int g_front_seg_index;
|
|
|
|
|
|
extern char subdir[128];
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
|
|
|
|
//消费者连接秘钥
|
|
|
|
|
|
extern std::string G_MQCONSUMER_ACCESSKEY;
|
|
|
|
|
|
extern std::string G_MQCONSUMER_SECRETKEY;
|
|
|
|
|
|
extern std::string G_MQCONSUMER_CHANNEL;
|
|
|
|
|
|
|
|
|
|
|
|
// 前向声明 RocketMQConsumer 类
|
|
|
|
|
|
class RocketMQConsumer;
|
|
|
|
|
|
|
|
|
|
|
|
// 全局映射:CPushConsumer* -> RocketMQConsumer*
|
|
|
|
|
|
std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
|
|
|
|
|
|
pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
|
|
|
|
|
|
|
|
|
|
|
|
class RocketMQConsumer {
|
|
|
|
|
|
public:
|
|
|
|
|
|
// 构造函数:初始化消费者并启动
|
|
|
|
|
|
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId);
|
|
|
|
|
|
|
|
|
|
|
|
// 禁用拷贝和赋值
|
|
|
|
|
|
RocketMQConsumer(const RocketMQConsumer&) {}
|
|
|
|
|
|
RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; }
|
|
|
|
|
|
|
|
|
|
|
|
// 订阅主题和标签,并注册回调
|
|
|
|
|
|
void subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback);
|
|
|
|
|
|
|
|
|
|
|
|
// 启动消费者
|
|
|
|
|
|
void start();
|
|
|
|
|
|
|
|
|
|
|
|
//修改消费模式
|
|
|
|
|
|
void setConsumerMessageModel(const std::string& topic);
|
|
|
|
|
|
|
|
|
|
|
|
// 析构函数:关闭并销毁消费者
|
|
|
|
|
|
~RocketMQConsumer();
|
|
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
|
CPushConsumer* consumer_; // C 接口消费者指针
|
|
|
|
|
|
//MessageCallBack messageCallback_; // 函数指针用于回调
|
|
|
|
|
|
std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射
|
|
|
|
|
|
|
|
|
|
|
|
// 静态消息处理回调
|
|
|
|
|
|
static int messageHandler(CPushConsumer* consumer, CMessageExt* msg);
|
|
|
|
|
|
|
|
|
|
|
|
// 实例消息处理函数
|
|
|
|
|
|
int handleMessage(CMessageExt* msg);
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// 构造函数实现
|
|
|
|
|
|
RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
|
|
|
|
|
|
: consumer_(NULL)//, messageCallback_(NULL)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 创建消费者
|
|
|
|
|
|
consumer_ = CreatePushConsumer(consumerName.c_str());
|
|
|
|
|
|
if (consumer_ == NULL) {
|
|
|
|
|
|
std::cout << "error CreatePushConsumer"<< std::endl;
|
|
|
|
|
|
throw std::runtime_error("Failed to create push consumer.");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
SetPushConsumerSessionCredentials(consumer_,G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(),G_MQCONSUMER_CHANNEL.c_str());
|
|
|
|
|
|
|
|
|
|
|
|
// 设置 NameServer 地址
|
|
|
|
|
|
if (SetPushConsumerNameServerAddress(consumer_, nameServer.c_str()) != 0) {
|
|
|
|
|
|
DestroyPushConsumer(consumer_);
|
|
|
|
|
|
std::cout << "error setting nameServer"<< std::endl;
|
|
|
|
|
|
throw std::runtime_error("Failed to set NameServer address.");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 设置消费者组ID
|
|
|
|
|
|
if (SetPushConsumerGroupID(consumer_, groupId.c_str()) != 0) {
|
|
|
|
|
|
DestroyPushConsumer(consumer_);
|
|
|
|
|
|
std::cout << "error setting groupId"<< std::endl;
|
|
|
|
|
|
throw std::runtime_error("Failed to set Consumer Group ID.");
|
|
|
|
|
|
}
|
2025-04-29 15:05:36 +08:00
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
//调试用
|
|
|
|
|
|
std::string consumerlog = "./mqconsumer/" + consumerName +".log";
|
|
|
|
|
|
if ( (SetPushConsumerLogPath(consumer_,consumerlog.c_str()) || SetPushConsumerLogFileNumAndSize(consumer_,10,100) || SetPushConsumerLogLevel(consumer_,E_LOG_LEVEL_DEBUG) ) != 0) {//记录消费日志
|
|
|
|
|
|
DestroyPushConsumer(consumer_);
|
|
|
|
|
|
std::cout << "error setting logpath"<< std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
std::cout << "logpath:" << consumerlog << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
// 注册消息回调
|
|
|
|
|
|
if (RegisterMessageCallback(consumer_, RocketMQConsumer::messageHandler) != 0) {
|
|
|
|
|
|
DestroyPushConsumer(consumer_);
|
|
|
|
|
|
std::cout << "error setting Callback"<< std::endl;
|
|
|
|
|
|
throw std::runtime_error("Failed to register message callback.");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 将消费者实例添加到全局映射
|
|
|
|
|
|
pthread_mutex_lock(&g_consumerMapMutex);
|
|
|
|
|
|
g_consumerMap[consumer_] = this;
|
|
|
|
|
|
pthread_mutex_unlock(&g_consumerMapMutex);
|
|
|
|
|
|
|
|
|
|
|
|
std::cout << "RocketMQ Consumer initialized and started." << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 启动消费者
|
|
|
|
|
|
void RocketMQConsumer::start()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (StartPushConsumer(consumer_) != 0) {
|
|
|
|
|
|
pthread_mutex_lock(&g_consumerMapMutex);
|
|
|
|
|
|
g_consumerMap.erase(consumer_);
|
|
|
|
|
|
pthread_mutex_unlock(&g_consumerMapMutex);
|
|
|
|
|
|
DestroyPushConsumer(consumer_);
|
|
|
|
|
|
throw std::runtime_error("Failed to start push consumer.");
|
|
|
|
|
|
}
|
|
|
|
|
|
else{
|
|
|
|
|
|
std::cout << "RocketMQ Consumer started." << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
|
|
|
|
|
|
throw std::runtime_error("Failed to subscribe to topic/tag.");
|
|
|
|
|
|
}
|
|
|
|
|
|
std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
// 使用 std::pair 作为键
|
|
|
|
|
|
std::pair<std::string, std::string> key(topic, tag);
|
|
|
|
|
|
callbacks_[key] = callback;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 静态消息处理回调实现
|
|
|
|
|
|
int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
|
|
|
|
|
|
{
|
|
|
|
|
|
RocketMQConsumer* instance = NULL;
|
|
|
|
|
|
|
|
|
|
|
|
//调试用
|
|
|
|
|
|
std::cout << "messagehandler" << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
// 查找对应的消费者实例
|
|
|
|
|
|
pthread_mutex_lock(&g_consumerMapMutex);
|
|
|
|
|
|
std::map<CPushConsumer*, RocketMQConsumer*>::iterator it = g_consumerMap.find(consumer);
|
|
|
|
|
|
if (it != g_consumerMap.end()) {
|
|
|
|
|
|
instance = it->second;
|
|
|
|
|
|
}
|
|
|
|
|
|
pthread_mutex_unlock(&g_consumerMapMutex);
|
|
|
|
|
|
|
|
|
|
|
|
if (instance) {
|
|
|
|
|
|
return instance->handleMessage(msg);
|
|
|
|
|
|
} else {
|
|
|
|
|
|
std::cerr << "Consumer instance not found for callback." << std::endl;
|
|
|
|
|
|
return E_RECONSUME_LATER; // 默认返回重试状态
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int RocketMQConsumer::handleMessage(CMessageExt* msg)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 检查 msg 和 consumer_ 是否为 NULL
|
|
|
|
|
|
if (!msg || !consumer_) {
|
|
|
|
|
|
std::cerr << "Received null message or consumer." << std::endl;
|
|
|
|
|
|
return E_RECONSUME_LATER;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 获取消息的主题和标签
|
|
|
|
|
|
std::string topic = GetMessageTopic(msg); // 假设存在此函数
|
|
|
|
|
|
std::string tag = GetMessageTags(msg); // 假设存在此函数
|
|
|
|
|
|
|
|
|
|
|
|
// 打印调试信息
|
|
|
|
|
|
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
// 使用 std::pair 作为键
|
|
|
|
|
|
std::pair<std::string, std::string> key(topic, tag);
|
|
|
|
|
|
|
|
|
|
|
|
// 查找对应的回调函数
|
|
|
|
|
|
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
|
|
|
|
|
|
if (it != callbacks_.end()) {
|
|
|
|
|
|
// 调用对应的回调函数
|
|
|
|
|
|
|
|
|
|
|
|
//调试
|
|
|
|
|
|
std::cout << "callback Handling message " <<std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
return it->second(consumer_, msg);
|
|
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
|
|
|
|
//调试
|
|
|
|
|
|
std::cout << "there is no callback " <<std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
// 如果没有找到对应的回调,执行默认处理
|
|
|
|
|
|
const char* body = GetMessageBody(msg);
|
|
|
|
|
|
const char* msgKey = GetMessageKeys(msg);
|
|
|
|
|
|
|
|
|
|
|
|
if (body) {
|
|
|
|
|
|
std::cout << "Received message body: " << body << std::endl;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
std::cout << "Received message with empty body." << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (msgKey) {
|
|
|
|
|
|
std::cout << "Message Key: " << msgKey << std::endl;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
std::cout << "Message Key: N/A" << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return E_CONSUME_SUCCESS;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 析构函数实现
|
|
|
|
|
|
RocketMQConsumer::~RocketMQConsumer()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (consumer_) {
|
|
|
|
|
|
// 关闭消费者
|
|
|
|
|
|
ShutdownPushConsumer(consumer_);
|
|
|
|
|
|
|
|
|
|
|
|
// 从全局映射中移除
|
|
|
|
|
|
pthread_mutex_lock(&g_consumerMapMutex);
|
|
|
|
|
|
g_consumerMap.erase(consumer_);
|
|
|
|
|
|
pthread_mutex_unlock(&g_consumerMapMutex);
|
|
|
|
|
|
|
|
|
|
|
|
// 销毁消费者
|
|
|
|
|
|
DestroyPushConsumer(consumer_);
|
|
|
|
|
|
consumer_ = NULL;
|
|
|
|
|
|
|
|
|
|
|
|
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 在 RocketMQConsumer 类中新增函数用来设置消费模式
|
|
|
|
|
|
void RocketMQConsumer::setConsumerMessageModel(const std::string& topic)
|
|
|
|
|
|
{
|
2025-05-12 16:43:42 +08:00
|
|
|
|
/*if (topic == G_MQCONSUMER_TOPIC_SET) {
|
2025-01-16 16:17:01 +08:00
|
|
|
|
// 设置为普通消费模式
|
|
|
|
|
|
if (SetPushConsumerMessageModel(consumer_, CLUSTERING) != 0) {
|
|
|
|
|
|
std::cout << "Error setting message model to CLUSTERING for topic: " << topic << std::endl;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
std::cout << "Set consumer to CLUSTERING for topic: " << topic << std::endl;
|
|
|
|
|
|
}
|
2025-05-12 16:43:42 +08:00
|
|
|
|
} else*/ {
|
2025-01-16 16:17:01 +08:00
|
|
|
|
// 默认设置为广播消费模式
|
|
|
|
|
|
if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
|
|
|
|
|
|
std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 全局消费者实例
|
|
|
|
|
|
RocketMQConsumer* g_consumer = NULL;
|
|
|
|
|
|
|
|
|
|
|
|
// 初始化消费者函数
|
|
|
|
|
|
void InitializeConsumer(
|
|
|
|
|
|
const std::string& consumerName,
|
|
|
|
|
|
const std::string& nameServer,
|
|
|
|
|
|
const std::vector<Subscription>& subscriptions) // 接收多个订阅
|
|
|
|
|
|
{
|
|
|
|
|
|
if (g_consumer == NULL) {
|
|
|
|
|
|
std::cout << "create new consumer!" << std::endl;
|
|
|
|
|
|
try {
|
|
|
|
|
|
g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组,不同进程(不同的消费者)同时消费topic的同一条消息
|
|
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < subscriptions.size(); ++i) {
|
|
|
|
|
|
g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式
|
|
|
|
|
|
g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag, subscriptions[i].callback);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g_consumer->start();
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
|
|
std::cerr << "Failed to initialize consumer: " << e.what() << std::endl;
|
|
|
|
|
|
throw; // 重新抛出异常
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭并销毁消费者函数
|
|
|
|
|
|
void ShutdownAndDestroyConsumer()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (g_consumer != NULL) {
|
|
|
|
|
|
delete g_consumer;
|
|
|
|
|
|
g_consumer = NULL;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 消费消息的接口函数
|
|
|
|
|
|
void rocketmq_consumer_receive(
|
|
|
|
|
|
const std::string& consumerName,
|
|
|
|
|
|
const std::string& nameServer,
|
|
|
|
|
|
const std::vector<Subscription>& subscriptions) // 接收多个订阅
|
|
|
|
|
|
{
|
|
|
|
|
|
if (g_consumer == NULL) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
//InitializeConsumer(consumerName, nameServer, topic, tag, callback);//初始化后,mq库内部来完成消息的获取
|
|
|
|
|
|
InitializeConsumer(consumerName, nameServer, subscriptions); // 初始化后,MQ库内部开始获取消息
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (...) {
|
|
|
|
|
|
std::cerr << "Cannot consume message because consumer initialization failed." << std::endl;
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
//封装生产者类
|
|
|
|
|
|
#if 1
|
|
|
|
|
|
// 全局或静态变量,用于维护当前队列索引
|
|
|
|
|
|
static int currentQueueId = 0;
|
|
|
|
|
|
|
|
|
|
|
|
// 队列选择器回调函数:轮询选择队列 ID
|
|
|
|
|
|
int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
|
|
|
|
|
|
if (queueNum == 0) {
|
|
|
|
|
|
throw std::runtime_error("No available queues");
|
|
|
|
|
|
}
|
|
|
|
|
|
int queueId = currentQueueId % queueNum;
|
|
|
|
|
|
currentQueueId++;
|
|
|
|
|
|
if (currentQueueId >= 1024 - queueNum) {
|
|
|
|
|
|
currentQueueId = 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
return queueId;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 封装生产者的类
|
|
|
|
|
|
class RocketMQProducer {
|
|
|
|
|
|
public:
|
|
|
|
|
|
RocketMQProducer(const std::string& producerName, const std::string& nameServer)
|
|
|
|
|
|
: producer_(NULL)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 创建生产者
|
|
|
|
|
|
producer_ = CreateProducer(producerName.c_str());
|
|
|
|
|
|
if (producer_ == NULL) {
|
|
|
|
|
|
throw std::runtime_error("Failed to create producer.");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 设置 nameserver 地址
|
|
|
|
|
|
SetProducerNameServerAddress(producer_, nameServer.c_str());
|
|
|
|
|
|
|
2025-04-10 16:07:39 +08:00
|
|
|
|
SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
|
|
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
// 启动生产者
|
|
|
|
|
|
StartProducer(producer_);
|
|
|
|
|
|
|
|
|
|
|
|
std::cout << "rocketmq_Producer initialized and started." << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 禁用拷贝和赋值
|
|
|
|
|
|
RocketMQProducer(const RocketMQProducer&) = delete;
|
|
|
|
|
|
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
|
|
|
|
|
|
|
2025-04-10 16:07:39 +08:00
|
|
|
|
void printSendResult(const CSendResult& result) {
|
|
|
|
|
|
std::cout << "SendResult:" << std::endl;
|
|
|
|
|
|
std::cout << " Status: ";
|
|
|
|
|
|
switch (result.sendStatus) {
|
|
|
|
|
|
case E_SEND_OK:
|
|
|
|
|
|
std::cout << "E_SEND_OK";
|
|
|
|
|
|
break;
|
|
|
|
|
|
case E_SEND_FLUSH_DISK_TIMEOUT:
|
|
|
|
|
|
std::cout << "E_SEND_FLUSH_DISK_TIMEOUT";
|
|
|
|
|
|
break;
|
|
|
|
|
|
case E_SEND_FLUSH_SLAVE_TIMEOUT:
|
|
|
|
|
|
std::cout << "E_SEND_FLUSH_SLAVE_TIMEOUT";
|
|
|
|
|
|
break;
|
|
|
|
|
|
case E_SEND_SLAVE_NOT_AVAILABLE:
|
|
|
|
|
|
std::cout << "E_SEND_SLAVE_NOT_AVAILABLE";
|
|
|
|
|
|
break;
|
|
|
|
|
|
default:
|
|
|
|
|
|
std::cout << "UNKNOWN(" << result.sendStatus << ")";
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
std::cout << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
std::cout << " MsgID : " << result.msgId << std::endl;
|
|
|
|
|
|
std::cout << " Offset: " << result.offset << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
// 发送消息
|
|
|
|
|
|
void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
|
|
|
|
|
|
CSendResult result;
|
|
|
|
|
|
CMessage* msg = NULL;
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
// 创建消息并设置属性
|
|
|
|
|
|
msg = CreateMessage(topic);
|
|
|
|
|
|
if (msg == NULL) {
|
|
|
|
|
|
throw std::runtime_error("Failed to create message.");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
SetMessageTags(msg, tags.c_str());
|
|
|
|
|
|
SetMessageKeys(msg, keys.c_str());
|
|
|
|
|
|
SetMessageBody(msg, strbody);
|
|
|
|
|
|
|
|
|
|
|
|
// 假设队列数量和 Broker 名称是固定的
|
|
|
|
|
|
int queueNum = QUEUENUM; // 配置的队列数量,例如5
|
|
|
|
|
|
|
|
|
|
|
|
// 发送消息
|
|
|
|
|
|
int sendResult = SendMessageOnewayOrderly(
|
|
|
|
|
|
producer_,
|
|
|
|
|
|
msg,
|
|
|
|
|
|
RoundRobinSelector, // 队列选择器回调函数
|
|
|
|
|
|
&queueNum // 传递给选择器的额外参数(队列数量)
|
|
|
|
|
|
);
|
2025-04-10 16:07:39 +08:00
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
if (sendResult == 0) { // 假设返回 0 表示成功
|
2025-02-21 16:24:41 +08:00
|
|
|
|
std::cout << "Message sent successfully.topic:" << topic <<std::endl;
|
2025-01-16 16:17:01 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
std::cout << "Failed to send message." << std::endl;
|
2025-04-10 16:07:39 +08:00
|
|
|
|
}
|
2025-01-16 16:17:01 +08:00
|
|
|
|
|
|
|
|
|
|
// 销毁消息
|
|
|
|
|
|
DestroyMessage(msg);
|
|
|
|
|
|
msg = NULL; // 防止重复销毁
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (const std::runtime_error& e) {
|
|
|
|
|
|
std::cerr << "Runtime error during message sending: " << e.what() << std::endl;
|
|
|
|
|
|
// 如果消息已经创建,确保销毁它
|
|
|
|
|
|
if (msg != NULL) {
|
|
|
|
|
|
DestroyMessage(msg);
|
|
|
|
|
|
}
|
|
|
|
|
|
// 可以在这里添加更多的错误处理逻辑,比如重试、记录日志等
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
|
|
std::cerr << "Exception during message sending: " << e.what() << std::endl;
|
|
|
|
|
|
if (msg != NULL) {
|
|
|
|
|
|
DestroyMessage(msg);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (...) {
|
|
|
|
|
|
std::cerr << "Unknown error during message sending." << std::endl;
|
|
|
|
|
|
if (msg != NULL) {
|
|
|
|
|
|
DestroyMessage(msg);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 析构函数中关闭并销毁生产者
|
|
|
|
|
|
~RocketMQProducer() {
|
|
|
|
|
|
if (producer_) {
|
|
|
|
|
|
ShutdownProducer(producer_);
|
|
|
|
|
|
DestroyProducer(producer_);
|
|
|
|
|
|
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
|
CProducer* producer_;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// 全局生产者实例
|
|
|
|
|
|
RocketMQProducer* g_producer = NULL;
|
|
|
|
|
|
|
|
|
|
|
|
// 初始化生产者(在程序启动时调用一次)
|
|
|
|
|
|
void InitializeProducer()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (g_producer == NULL) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
|
|
std::cerr << "Failed to initialize producer: " << e.what() << std::endl;
|
|
|
|
|
|
// 根据需求决定是否终止程序或采取其他措施
|
|
|
|
|
|
throw; // 重新抛出异常
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭并销毁生产者(在程序结束时调用一次)
|
|
|
|
|
|
void ShutdownAndDestroyProducer()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (g_producer != NULL) {
|
|
|
|
|
|
delete g_producer;
|
|
|
|
|
|
g_producer = NULL;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 发送消息的接口函数
|
|
|
|
|
|
void rocketmq_producer_send(const char* strbody, const char* topic)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (g_producer == NULL) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
InitializeProducer();
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (...) {
|
|
|
|
|
|
std::cerr << "Cannot send message because producer initialization failed." << std::endl;
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 假设 tags 和 keys 是固定的,可以根据需要修改
|
|
|
|
|
|
std::string tags = G_ROCKETMQ_TAG;
|
|
|
|
|
|
std::string keys = G_ROCKETMQ_KEY;
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
g_producer->sendMessage(strbody, topic, tags, keys);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
|
|
std::cerr << "Failed to send message: " << e.what() << std::endl;
|
|
|
|
|
|
// 处理发送失败的情况,例如记录日志或重试
|
2026-01-06 10:23:43 +08:00
|
|
|
|
DIY_ERRORLOG_CODE("process",LOG_CODE_MQ,"前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
|
2025-01-16 16:17:01 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
// producer_send0测试用
|
|
|
|
|
|
void StartSendMessage(CProducer* producer)
|
|
|
|
|
|
{
|
|
|
|
|
|
CSendResult result;
|
|
|
|
|
|
|
|
|
|
|
|
// create message and set some values for it
|
|
|
|
|
|
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str());
|
|
|
|
|
|
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str());
|
|
|
|
|
|
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str());
|
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; i++)
|
|
|
|
|
|
{
|
|
|
|
|
|
// construct different body
|
|
|
|
|
|
string strMessageBody = "this is body number";
|
|
|
|
|
|
|
|
|
|
|
|
SetMessageBody(msg, strMessageBody.c_str());
|
|
|
|
|
|
// send message
|
|
|
|
|
|
SendMessageSync(producer, msg, &result);
|
|
|
|
|
|
|
|
|
|
|
|
cout << "send message[" << i << "], result status:" << result.sendStatus << ", msgBody:" << strMessageBody << endl;
|
|
|
|
|
|
usleep(1000);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// destroy message
|
|
|
|
|
|
DestroyMessage(msg);
|
|
|
|
|
|
}
|
|
|
|
|
|
//producer_send 测试用
|
|
|
|
|
|
void StartSendMessage(CProducer* producer,const char* strbody)
|
|
|
|
|
|
{
|
|
|
|
|
|
CSendResult result;
|
|
|
|
|
|
|
|
|
|
|
|
// create message and set some values for it
|
|
|
|
|
|
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str());
|
|
|
|
|
|
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str());
|
|
|
|
|
|
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str());
|
|
|
|
|
|
|
|
|
|
|
|
SetMessageBody(msg, strbody);
|
|
|
|
|
|
// send message
|
|
|
|
|
|
SendMessageSync(producer, msg, &result);
|
|
|
|
|
|
|
|
|
|
|
|
cout << "result status:" << result.sendStatus << ", msgBody:" << strbody << endl;
|
|
|
|
|
|
|
|
|
|
|
|
// destroy message
|
|
|
|
|
|
DestroyMessage(msg);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//测试用 固定消息体
|
|
|
|
|
|
void producer_send0()
|
|
|
|
|
|
{
|
|
|
|
|
|
cout << "Producer Initializing....." << endl;
|
|
|
|
|
|
|
|
|
|
|
|
// create producer and set some values for it
|
|
|
|
|
|
CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str());
|
|
|
|
|
|
SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str());
|
|
|
|
|
|
// start producer
|
|
|
|
|
|
StartProducer(producer);
|
|
|
|
|
|
cout << "Producer start....." << endl;
|
|
|
|
|
|
// send message
|
|
|
|
|
|
StartSendMessage(producer);
|
|
|
|
|
|
// shutdown producer
|
|
|
|
|
|
ShutdownProducer(producer);
|
|
|
|
|
|
// destroy producer
|
|
|
|
|
|
DestroyProducer(producer);
|
|
|
|
|
|
cout << "Producer Shutdown!" << endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
//测试用 可控制消息体
|
|
|
|
|
|
void producer_send(const char* strbody)
|
|
|
|
|
|
{
|
|
|
|
|
|
cout << "Producer Initializing....." << endl;
|
|
|
|
|
|
|
|
|
|
|
|
// create producer and set some values for it
|
|
|
|
|
|
CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str());
|
|
|
|
|
|
SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str());
|
|
|
|
|
|
// start producer
|
|
|
|
|
|
StartProducer(producer);
|
|
|
|
|
|
cout << "Producer start....." << endl;
|
|
|
|
|
|
// send message
|
|
|
|
|
|
StartSendMessage(producer, strbody);
|
|
|
|
|
|
// shutdown producer
|
|
|
|
|
|
ShutdownProducer(producer);
|
|
|
|
|
|
// destroy producer
|
|
|
|
|
|
DestroyProducer(producer);
|
|
|
|
|
|
cout << "Producer Shutdown!" << endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
2025-02-18 17:10:22 +08:00
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
extern "C" {
|
|
|
|
|
|
extern std::string G_MQCONSUMER_TOPIC_RT;
|
|
|
|
|
|
void rocketmq_test_rt()
|
|
|
|
|
|
{
|
|
|
|
|
|
Ckafka_data_t data;
|
|
|
|
|
|
data.monitor_id = 123123;
|
2025-05-12 16:43:42 +08:00
|
|
|
|
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT);
|
2025-01-16 16:17:01 +08:00
|
|
|
|
std::ifstream file("rt.txt"); // 文件中存储长字符串
|
|
|
|
|
|
std::stringstream buffer;
|
|
|
|
|
|
buffer << file.rdbuf(); // 读取整个文件内容
|
|
|
|
|
|
|
|
|
|
|
|
data.strText = QString::fromStdString(buffer.str());
|
|
|
|
|
|
data.mp_id = 123123;
|
|
|
|
|
|
my_rocketmq_send(data);
|
|
|
|
|
|
}
|
|
|
|
|
|
extern std::string G_MQCONSUMER_TOPIC_UD;
|
|
|
|
|
|
void rocketmq_test_ud()//用来测试台账更新
|
|
|
|
|
|
{
|
|
|
|
|
|
Ckafka_data_t data;
|
|
|
|
|
|
data.monitor_id = 123123;
|
2025-05-12 16:43:42 +08:00
|
|
|
|
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD);
|
2025-01-16 16:17:01 +08:00
|
|
|
|
std::ifstream file("ud.txt"); // 文件中存储长字符串
|
|
|
|
|
|
std::stringstream buffer;
|
|
|
|
|
|
buffer << file.rdbuf(); // 读取整个文件内容
|
|
|
|
|
|
|
|
|
|
|
|
data.strText = QString::fromStdString(buffer.str());
|
|
|
|
|
|
data.mp_id = 123123;
|
|
|
|
|
|
my_rocketmq_send(data);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void rocketmq_test_set()//用来测试进程控制脚本
|
|
|
|
|
|
{
|
|
|
|
|
|
Ckafka_data_t data;
|
|
|
|
|
|
data.monitor_id = 123123;
|
2025-05-12 16:43:42 +08:00
|
|
|
|
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET);
|
2025-01-16 16:17:01 +08:00
|
|
|
|
std::ifstream file("set.txt"); // 文件中存储长字符串
|
|
|
|
|
|
std::stringstream buffer;
|
|
|
|
|
|
buffer << file.rdbuf(); // 读取整个文件内容
|
|
|
|
|
|
|
|
|
|
|
|
data.strText = QString::fromStdString(buffer.str());
|
|
|
|
|
|
data.mp_id = 123123;
|
|
|
|
|
|
my_rocketmq_send(data);
|
|
|
|
|
|
}
|
2025-02-11 18:23:19 +08:00
|
|
|
|
|
|
|
|
|
|
void rocketmq_test_only()//用来测试进程控制脚本
|
|
|
|
|
|
{
|
|
|
|
|
|
Ckafka_data_t data;
|
|
|
|
|
|
data.monitor_id = 123123;
|
2025-05-12 16:43:42 +08:00
|
|
|
|
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET);
|
2025-02-11 18:23:19 +08:00
|
|
|
|
std::ifstream file("set_debug.txt"); // 文件中存储长字符串
|
|
|
|
|
|
std::stringstream buffer;
|
|
|
|
|
|
buffer << file.rdbuf(); // 读取整个文件内容
|
|
|
|
|
|
|
|
|
|
|
|
data.strText = QString::fromStdString(buffer.str());
|
|
|
|
|
|
data.mp_id = 123123;
|
|
|
|
|
|
my_rocketmq_send(data);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
extern std::string G_MQCONSUMER_TOPIC_RC;
|
|
|
|
|
|
void rocketmq_test_rc()
|
|
|
|
|
|
{
|
|
|
|
|
|
Ckafka_data_t data;
|
|
|
|
|
|
data.monitor_id = 123123;
|
2025-05-12 16:43:42 +08:00
|
|
|
|
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC);
|
2025-01-16 16:17:01 +08:00
|
|
|
|
std::ifstream file("rc.txt"); // 文件中存储长字符串
|
|
|
|
|
|
std::stringstream buffer;
|
|
|
|
|
|
buffer << file.rdbuf(); // 读取整个文件内容
|
|
|
|
|
|
|
|
|
|
|
|
data.strText = QString::fromStdString(buffer.str());
|
|
|
|
|
|
data.mp_id = 123123;
|
|
|
|
|
|
my_rocketmq_send(data);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-02-26 16:39:10 +08:00
|
|
|
|
extern std::string G_MQCONSUMER_TOPIC_LOG;
|
|
|
|
|
|
void rocketmq_test_log()
|
|
|
|
|
|
{
|
|
|
|
|
|
Ckafka_data_t data;
|
|
|
|
|
|
data.monitor_id = 123123;
|
2025-05-12 16:43:42 +08:00
|
|
|
|
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG);
|
2025-02-26 16:39:10 +08:00
|
|
|
|
std::ifstream file("log_test.txt"); // 文件中存储长字符串
|
|
|
|
|
|
std::stringstream buffer;
|
|
|
|
|
|
buffer << file.rdbuf(); // 读取整个文件内容
|
|
|
|
|
|
|
|
|
|
|
|
data.strText = QString::fromStdString(buffer.str());
|
|
|
|
|
|
data.mp_id = 123123;
|
|
|
|
|
|
my_rocketmq_send(data);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-01-16 16:17:01 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-02-18 17:10:22 +08:00
|
|
|
|
|