Files
microser/cfg_parse/SimpleProducer.cpp

1052 lines
36 KiB
C++
Raw Normal View History

2025-01-16 16:17:01 +08:00
#include <fstream> // 用于 std::ifstream
#include <sstream> // 用于 std::stringstream
#include <unistd.h>
#include <stdlib.h>
#include <iostream>
#include <atomic>
2025-01-16 16:17:01 +08:00
#include <string>
#include "../mms/db_interface.h"
2025-02-18 17:10:22 +08:00
2025-05-09 16:53:07 +08:00
#include "../rocketmq/CProducer.h"
#include "../rocketmq/CMessage.h"
#include "../rocketmq/CSendResult.h"
#include "../rocketmq/SimpleProducer.h"
2025-01-16 16:17:01 +08:00
//测试300数据用lnk20241202
#include <ctime>
#include <QThread>
//测试300数据用
//lnk20241209添加队列选择
2025-05-09 16:53:07 +08:00
#include "../rocketmq/MQSelector.h"
#include "../rocketmq/MQMessageQueue.h"
2025-01-16 16:17:01 +08:00
//#include <atomic>
#include <vector>
#include <stdexcept>
//lnk20241209添加队列选择
#include <cstring>
//引入消费起点
2025-05-09 16:53:07 +08:00
#include "../rocketmq/DefaultMQPushConsumer.h"
#include "../rocketmq/ConsumeType.h"
#include "../rocketmq/MQMessageListener.h"
#include "../rocketmq/MQMessageExt.h"
#include "../rocketmq/SessionCredentials.h"
2025-01-16 16:17:01 +08:00
// 引入提供的消费者接口头文件
//#include "../rocketmq/CPushConsumer.h"
//#include "../rocketmq/CCommon.h"
//#include "../rocketmq/CMessageExt.h"
2025-01-16 16:17:01 +08:00
#include <map>
#include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex
#include <utility> // for std::pair
2025-05-30 15:40:20 +08:00
#include "../log4cplus/log4.h"//lnk添加log4
2025-01-16 16:17:01 +08:00
using namespace std;
2026-04-30 15:45:34 +08:00
static std::once_flag g_consumer_once;
static std::once_flag g_producer_once;
2025-01-16 16:17:01 +08:00
extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer
extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port
2026-04-30 15:45:34 +08:00
extern std::string G_ROCKETMQ_TOPIC_TEST;//topie
extern std::string G_ROCKETMQ_TAG_TEST;//tag
extern std::string G_ROCKETMQ_KEY_TEST;//key
2025-01-16 16:17:01 +08:00
2026-04-03 16:03:25 +08:00
extern std::string G_MQCONSUMER_TOPIC_LOG;
extern std::string G_MQCONSUMER_TOPIC_SET;
extern std::string G_MQCONSUMER_TOPIC_RC;
extern std::string G_MQCONSUMER_TOPIC_UD;
extern std::string G_MQCONSUMER_TOPIC_RT;
2026-04-30 15:45:34 +08:00
extern std::string G_MQCONSUMER_TOPIC_TEST;
2025-05-12 16:43:42 +08:00
extern std::string FRONT_INST;
2026-04-17 16:35:35 +08:00
extern bool DEBUGOPEN;
2025-05-12 16:43:42 +08:00
2025-01-16 16:17:01 +08:00
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
extern int QUEUENUM;
extern int g_front_seg_index;
extern char subdir[128];
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//消费者连接秘钥
extern std::string G_MQCONSUMER_ACCESSKEY;
extern std::string G_MQCONSUMER_SECRETKEY;
extern std::string G_MQCONSUMER_CHANNEL;
// 前向声明 RocketMQConsumer 类
class RocketMQConsumer;
// 全局映射CPushConsumer* -> RocketMQConsumer*
//std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
//pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
////////////////////////////////
int64_t G_APP_START_MS = []() -> int64_t {
using namespace std::chrono;
return duration_cast<milliseconds>(
system_clock::now().time_since_epoch()
).count();
}();
int64_t G_START_SKEW_MS = 1000;
bool should_process_after_start(const rocketmq::MQMessageExt& msg)
{
const int64_t born_ts = static_cast<int64_t>(msg.getBornTimestamp());
return born_ts >= (G_APP_START_MS - G_START_SKEW_MS);
}
///////////////////////////////
class InternalListener;
2025-01-16 16:17:01 +08:00
class RocketMQConsumer {
public:
// 构造函数:初始化消费者并启动
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer);
2025-01-16 16:17:01 +08:00
// 禁用拷贝和赋值
2026-04-30 15:45:34 +08:00
RocketMQConsumer(const RocketMQConsumer&) = delete;
RocketMQConsumer& operator=(const RocketMQConsumer&) = delete;
2025-01-16 16:17:01 +08:00
// 订阅主题和标签,并注册回调
void subscribe(const std::string& topic,
const std::string& tag,
MessageCallBack callback);
2025-01-16 16:17:01 +08:00
// 启动消费者
void start();
//修改消费模式
void setConsumerMessageModel(const std::string& topic);
rocketmq::ConsumeStatus handleMessage(const rocketmq::MQMessageExt& msg);
2025-01-16 16:17:01 +08:00
// 析构函数:关闭并销毁消费者
~RocketMQConsumer();
private:
//CPushConsumer* consumer_; // C 接口消费者指针
2025-01-16 16:17:01 +08:00
//MessageCallBack messageCallback_; // 函数指针用于回调
rocketmq::DefaultMQPushConsumer consumer_;
InternalListener* listener_;
2025-01-16 16:17:01 +08:00
std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射
/*// 静态消息处理回调
2025-01-16 16:17:01 +08:00
static int messageHandler(CPushConsumer* consumer, CMessageExt* msg);
// 实例消息处理函数
int handleMessage(CMessageExt* msg);*/
2025-01-16 16:17:01 +08:00
};
class InternalListener : public rocketmq::MessageListenerConcurrently {
public:
explicit InternalListener(RocketMQConsumer* owner)
: owner_(owner) {}
2025-01-16 16:17:01 +08:00
rocketmq::ConsumeStatus consumeMessage(
const std::vector<rocketmq::MQMessageExt>& msgs) override
{
for (size_t i = 0; i < msgs.size(); ++i) {
rocketmq::ConsumeStatus ret = owner_->handleMessage(msgs[i]);
if (ret != rocketmq::CONSUME_SUCCESS) {
return ret;
}
}
return rocketmq::CONSUME_SUCCESS;
}
private:
RocketMQConsumer* owner_;
};
// 构造函数实现C
/*RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
2025-01-16 16:17:01 +08:00
: consumer_(NULL)//, messageCallback_(NULL)
{
// 创建消费者
consumer_ = CreatePushConsumer(consumerName.c_str());
if (consumer_ == NULL) {
std::cout << "error CreatePushConsumer"<< std::endl;
throw std::runtime_error("Failed to create push consumer.");
}
SetPushConsumerSessionCredentials(consumer_,G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(),G_MQCONSUMER_CHANNEL.c_str());
// 设置 NameServer 地址
if (SetPushConsumerNameServerAddress(consumer_, nameServer.c_str()) != 0) {
DestroyPushConsumer(consumer_);
std::cout << "error setting nameServer"<< std::endl;
throw std::runtime_error("Failed to set NameServer address.");
}
// 设置消费者组ID
if (SetPushConsumerGroupID(consumer_, groupId.c_str()) != 0) {
DestroyPushConsumer(consumer_);
std::cout << "error setting groupId"<< std::endl;
throw std::runtime_error("Failed to set Consumer Group ID.");
}
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
//调试用
std::string consumerlog = "./mqconsumer/" + consumerName +".log";
if ( (SetPushConsumerLogPath(consumer_,consumerlog.c_str()) || SetPushConsumerLogFileNumAndSize(consumer_,10,100) || SetPushConsumerLogLevel(consumer_,E_LOG_LEVEL_DEBUG) ) != 0) {//记录消费日志
DestroyPushConsumer(consumer_);
std::cout << "error setting logpath"<< std::endl;
}
std::cout << "logpath:" << consumerlog << std::endl;
// 注册消息回调
if (RegisterMessageCallback(consumer_, RocketMQConsumer::messageHandler) != 0) {
DestroyPushConsumer(consumer_);
std::cout << "error setting Callback"<< std::endl;
throw std::runtime_error("Failed to register message callback.");
}
// 将消费者实例添加到全局映射
pthread_mutex_lock(&g_consumerMapMutex);
g_consumerMap[consumer_] = this;
pthread_mutex_unlock(&g_consumerMapMutex);
std::cout << "RocketMQ Consumer initialized and started." << std::endl;
}*/
//构造函数实现C++
RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup,
const std::string& nameServer)
: consumer_(consumerGroup),
listener_(NULL)
{
consumer_.setNamesrvAddr(nameServer);
consumer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
G_MQCONSUMER_CHANNEL
);
2026-04-30 15:45:34 +08:00
// 限制消费线程池,防止 ConsumeTP 爆炸
consumer_.setConsumeThreadCount(4);
listener_ = new InternalListener(this);
2025-01-16 16:17:01 +08:00
}
// 启动消费者
void RocketMQConsumer::start()
{
2026-04-30 15:45:34 +08:00
static bool started = false;
if (started) {
std::cout << "Consumer already started" << std::endl;
return;
2025-01-16 16:17:01 +08:00
}
2026-04-30 15:45:34 +08:00
consumer_.registerMessageListener(listener_);
consumer_.start();
2026-04-30 15:45:34 +08:00
started = true;
2025-01-16 16:17:01 +08:00
}
void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback)
{
/*if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
2025-01-16 16:17:01 +08:00
throw std::runtime_error("Failed to subscribe to topic/tag.");
}*/
consumer_.subscribe(topic, tag);
//调试用
2025-01-16 16:17:01 +08:00
std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
std::pair<std::string, std::string> mapKey(topic, tag);
callbacks_[mapKey] = callback;
2025-01-16 16:17:01 +08:00
}
/*
2025-01-16 16:17:01 +08:00
// 静态消息处理回调实现
int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
{
RocketMQConsumer* instance = NULL;
//调试用
std::cout << "messagehandler" << std::endl;
// 查找对应的消费者实例
pthread_mutex_lock(&g_consumerMapMutex);
std::map<CPushConsumer*, RocketMQConsumer*>::iterator it = g_consumerMap.find(consumer);
if (it != g_consumerMap.end()) {
instance = it->second;
}
pthread_mutex_unlock(&g_consumerMapMutex);
if (instance) {
return instance->handleMessage(msg);
} else {
std::cerr << "Consumer instance not found for callback." << std::endl;
//return E_RECONSUME_LATER; // 默认返回重试状态
return rocketmq::RECONSUME_LATER;
2025-01-16 16:17:01 +08:00
}
}
*/
/*int RocketMQConsumer::handleMessage(CMessageExt* msg)
2025-01-16 16:17:01 +08:00
{
// 检查 msg 和 consumer_ 是否为 NULL
if (!msg || !consumer_) {
std::cerr << "Received null message or consumer." << std::endl;
//return E_RECONSUME_LATER;
return rocketmq::RECONSUME_LATER;
}*/
2025-01-16 16:17:01 +08:00
// 获取消息的主题和标签
//std::string topic = GetMessageTopic(msg); // 假设存在此函数
//std::string tag = GetMessageTags(msg); // 假设存在此函数
rocketmq::ConsumeStatus RocketMQConsumer::handleMessage( const rocketmq::MQMessageExt& msg) {
std::string tag = msg.getTags();
std::string topic = msg.getTopic();
// 打印调试信息
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
std::pair<std::string, std::string> key(topic, tag);
// 查找对应的回调函数
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
if (it != callbacks_.end())
{ // 调用对应的回调函数
//调试
std::cout << "callback Handling message " <<std::endl;
//return it->second(consumer_, msg);
return it->second(msg);
}
else {
//调试
std::cout << "there is no callback " <<std::endl;
// 如果没有找到对应的回调,执行默认处理
//const char* body = GetMessageBody(msg);
//const char* msgKey = GetMessageKeys(msg);
std::string body = msg.getBody();
std::string msgKey = msg.getKeys();
if (!body.empty()) {
std::cout << "Received message body: " << body << std::endl;
} else {
std::cout << "Received message with empty body." << std::endl;
} if (!msgKey.empty()) {
std::cout << "Message Key: " << msgKey << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
} //return E_CONSUME_SUCCESS;
return rocketmq::CONSUME_SUCCESS;
}
2025-01-16 16:17:01 +08:00
}
// 析构函数实现
RocketMQConsumer::~RocketMQConsumer()
{
/*if (consumer_) {
2025-01-16 16:17:01 +08:00
// 关闭消费者
ShutdownPushConsumer(consumer_);
// 从全局映射中移除
pthread_mutex_lock(&g_consumerMapMutex);
g_consumerMap.erase(consumer_);
pthread_mutex_unlock(&g_consumerMapMutex);
// 销毁消费者
DestroyPushConsumer(consumer_);
consumer_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
}*/
try {
consumer_.shutdown();
} catch (...) {
2025-01-16 16:17:01 +08:00
}
2026-04-30 15:45:34 +08:00
sleep(1); // 等内部线程退出
delete listener_;
listener_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
2025-01-16 16:17:01 +08:00
}
// 在 RocketMQConsumer 类中新增函数用来设置消费模式
void RocketMQConsumer::setConsumerMessageModel(const std::string& topic)
{
2025-05-12 16:43:42 +08:00
/*if (topic == G_MQCONSUMER_TOPIC_SET) {
2025-01-16 16:17:01 +08:00
// 设置为普通消费模式
if (SetPushConsumerMessageModel(consumer_, CLUSTERING) != 0) {
std::cout << "Error setting message model to CLUSTERING for topic: " << topic << std::endl;
} else {
std::cout << "Set consumer to CLUSTERING for topic: " << topic << std::endl;
}
2025-05-12 16:43:42 +08:00
} else*/ {
2025-01-16 16:17:01 +08:00
// 默认设置为广播消费模式
/*if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
2025-01-16 16:17:01 +08:00
std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl;
} else {
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
}*/
consumer_.setMessageModel(rocketmq::BROADCASTING);
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
2025-01-16 16:17:01 +08:00
}
}
// 全局消费者实例
RocketMQConsumer* g_consumer = NULL;
// 初始化消费者函数
void InitializeConsumer(
const std::string& consumerName,
const std::string& nameServer,
const std::vector<Subscription>& subscriptions) // 接收多个订阅
{
if (g_consumer == NULL) {
std::cout << "create new consumer!" << std::endl;
try {
//g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组不同进程不同的消费者同时消费topic的同一条消息
g_consumer = new RocketMQConsumer(consumerName, nameServer);
2025-01-16 16:17:01 +08:00
for (size_t i = 0; i < subscriptions.size(); ++i) {
g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式
g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag,subscriptions[i].callback);
2025-01-16 16:17:01 +08:00
}
g_consumer->start();
}
catch (const std::exception& e) {
std::cerr << "Failed to initialize consumer: " << e.what() << std::endl;
throw; // 重新抛出异常
}
}
}
// 关闭并销毁消费者函数
void ShutdownAndDestroyConsumer()
{
if (g_consumer != NULL) {
delete g_consumer;
g_consumer = NULL;
}
}
// 消费消息的接口函数
void rocketmq_consumer_receive(
const std::string& consumerName,
const std::string& nameServer,
const std::vector<Subscription>& subscriptions) // 接收多个订阅
{
2026-04-30 15:45:34 +08:00
std::call_once(g_consumer_once, [&](){
2025-01-16 16:17:01 +08:00
try {
2026-04-30 15:45:34 +08:00
InitializeConsumer(consumerName, nameServer, subscriptions);
} catch (...) {
2025-01-16 16:17:01 +08:00
std::cerr << "Cannot consume message because consumer initialization failed." << std::endl;
}
2026-04-30 15:45:34 +08:00
});
2025-01-16 16:17:01 +08:00
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
//封装生产者类
#if 1
// 全局或静态变量,用于维护当前队列索引
static int currentQueueId = 0;
// 队列选择器回调函数:轮询选择队列 ID
/*int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
2025-01-16 16:17:01 +08:00
if (queueNum == 0) {
throw std::runtime_error("No available queues");
}
int queueId = currentQueueId % queueNum;
currentQueueId++;
if (currentQueueId >= 1024 - queueNum) {
currentQueueId = 0;
}
return queueId;
}*/
int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
static std::atomic<int> currentQueueId(0);
if (queueNum <= 0) {
std::cout << "[MQ][SELECTOR_FAIL] queueNum=" << queueNum << std::endl;
return -1;
}
int id = currentQueueId.fetch_add(1, std::memory_order_relaxed);
int queueId = id % queueNum;
std::cout << "[MQ][SELECTOR] queueNum=" << queueNum
<< ", current=" << id
<< ", selected=" << queueId
<< std::endl;
return queueId;
2025-01-16 16:17:01 +08:00
}
// 封装生产者的类
class RocketMQProducer {
public:
RocketMQProducer(const std::string& producerName, const std::string& nameServer)
: producer_(producerName)
2025-01-16 16:17:01 +08:00
{
// 创建生产者
/*producer_ = CreateProducer(producerName.c_str());
2025-01-16 16:17:01 +08:00
if (producer_ == NULL) {
throw std::runtime_error("Failed to create producer.");
}*/
// 设置日志
producer_.setLogLevel(rocketmq::eLOG_LEVEL_ERROR);
producer_.setLogFileSizeAndNum(5, 50);
2025-01-16 16:17:01 +08:00
// 设置 nameserver 地址
//SetProducerNameServerAddress(producer_, nameServer.c_str());
producer_.setNamesrvAddr(nameServer);
2025-01-16 16:17:01 +08:00
2026-04-17 16:35:35 +08:00
//lnk20260417设置数据上送消息体最大值默认4M调整为1M避免过大消息导致发送失败
//SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
producer_.setMaxMessageSize(1024 * 1024);
2026-04-17 16:35:35 +08:00
//SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
producer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
""
);
2025-04-10 16:07:39 +08:00
2025-01-16 16:17:01 +08:00
// 启动生产者
//StartProducer(producer_);
producer_.start();
2025-01-16 16:17:01 +08:00
std::cout << "rocketmq_Producer initialized and started." << std::endl;
}
// 禁用拷贝和赋值
RocketMQProducer(const RocketMQProducer&) = delete;
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
/*void printSendResult(const CSendResult& result) {
2025-04-10 16:07:39 +08:00
std::cout << "SendResult:" << std::endl;
std::cout << " Status: ";
switch (result.sendStatus) {
case E_SEND_OK:
std::cout << "E_SEND_OK";
break;
case E_SEND_FLUSH_DISK_TIMEOUT:
std::cout << "E_SEND_FLUSH_DISK_TIMEOUT";
break;
case E_SEND_FLUSH_SLAVE_TIMEOUT:
std::cout << "E_SEND_FLUSH_SLAVE_TIMEOUT";
break;
case E_SEND_SLAVE_NOT_AVAILABLE:
std::cout << "E_SEND_SLAVE_NOT_AVAILABLE";
break;
default:
std::cout << "UNKNOWN(" << result.sendStatus << ")";
break;
}
std::cout << std::endl;
std::cout << " MsgID : " << result.msgId << std::endl;
std::cout << " Offset: " << result.offset << std::endl;
}*/
2025-04-10 16:07:39 +08:00
2025-01-16 16:17:01 +08:00
// 发送消息
/* void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
2026-04-17 16:35:35 +08:00
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL")
<< ", tags: " << tags
<< ", keys: " << keys
<< std::endl;
if (strbody) {
// ===== 1⃣ 真实长度 vs strlen =====
std::string body_str(strbody);
std::cout << "[MQ][LEN_CHECK]"
<< " strlen=" << strlen(strbody)
<< ", std::string.size=" << body_str.size()
<< std::endl;
// ===== 2⃣ 检测是否包含 \0 =====
bool has_null = false;
for (size_t i = 0; i < body_str.size(); i++) {
if (body_str[i] == '\0') {
has_null = true;
std::cout << "[MQ][FOUND_NULL] index=" << i << std::endl;
break;
}
}
std::cout << "[MQ][HAS_NULL] " << (has_null ? "YES" : "NO") << std::endl;
// ===== 3⃣ 打印头部(可读)=====
size_t len = strlen(strbody);
size_t n = std::min((size_t)200, len);
std::cout << "[MQ][BODY_HEAD] "
<< std::string(strbody, n)
<< std::endl;
std::cout << "[MQ][BODY_TAIL] "
<< std::string(strbody + (len - n), n)
<< std::endl;
// ===== 4⃣ 十六进制打印前100字节 =====
std::cout << "[MQ][HEX_HEAD] ";
for (size_t i = 0; i < std::min((size_t)100, body_str.size()); i++) {
printf("%02X ", (unsigned char)body_str[i]);
}
printf("\n");
// ===== 5⃣ 十六进制打印尾部100字节 =====
std::cout << "[MQ][HEX_TAIL] ";
size_t start = (body_str.size() > 100) ? body_str.size() - 100 : 0;
for (size_t i = start; i < body_str.size(); i++) {
printf("%02X ", (unsigned char)body_str[i]);
}
printf("\n");
} else {
std::cout << "[MQ][ERROR] strbody is NULL" << std::endl;
}
}
2025-01-16 16:17:01 +08:00
CSendResult result;
CMessage* msg = NULL;
try {
// 创建消息并设置属性
msg = CreateMessage(topic);
if (msg == NULL) {
throw std::runtime_error("Failed to create message.");
}
SetMessageTags(msg, tags.c_str());
SetMessageKeys(msg, keys.c_str());
SetMessageBody(msg, strbody);
// 假设队列数量和 Broker 名称是固定的
int queueNum = QUEUENUM; // 配置的队列数量例如5
// 发送消息
int sendResult = SendMessageOnewayOrderly(
producer_,
msg,
RoundRobinSelector, // 队列选择器回调函数
&queueNum // 传递给选择器的额外参数(队列数量)
);
2026-04-17 16:35:35 +08:00
/////////////////////////////////替换接口,性能较低但不影响
/*CSendResult result;
memset(&result, 0, sizeof(result));
int sendResult = SendMessageOrderly(
producer_,
msg,
RoundRobinSelector,
&queueNum,
0, // autoRetryTimes
&result
);
std::cout << "[MQ][ORDERLY_RESULT]"
<< " ret=" << sendResult
<< ", sendStatus=" << (int)result.sendStatus
<< ", msgId=" << result.msgId
<< ", offset=" << result.offset
<< ", topic=" << (topic ? topic : "")
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;*/
/////////////////////////////////替换接口,性能较低但不影响
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
/*CSendResult result;
memset(&result, 0, sizeof(result));
int sendResult = SendMessageSync(
producer_,
msg,
&result
);
std::cout << "[MQ][SYNC_RESULT]"
<< " ret=" << sendResult
<< ", sendStatus=" << (int)result.sendStatus
<< ", msgId=" << result.msgId
<< ", offset=" << result.offset
<< ", topic=" << (topic ? topic : "")
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;*/
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
2025-04-10 16:07:39 +08:00
/*if (sendResult == 0) { // 假设返回 0 表示成功
std::cout << "[MQ][SEND_OK]"
<< " topic=" << (topic ? topic : "")
<< ", tags=" << tags
<< ", keys=" << keys
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;
2025-01-16 16:17:01 +08:00
} else {
2026-04-17 16:35:35 +08:00
std::cout << "[MQ][SEND_FAIL]"
<< " ret=" << sendResult
<< ", topic=" << (topic ? topic : "")
<< ", tags=" << tags
<< ", keys=" << keys
2026-04-17 16:35:35 +08:00
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;
2026-04-17 16:35:35 +08:00
std::cout << "[MQ][BODY_HEAD] " << std::string(strbody, std::min((size_t)200, strlen(strbody))) << std::endl;
std::cout << "[MQ][BODY_TAIL] " << std::string(strbody + std::max((size_t)0, strlen(strbody) - std::min((size_t)200, strlen(strbody)))) << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
2025-04-10 16:07:39 +08:00
}
2025-01-16 16:17:01 +08:00
// 销毁消息
DestroyMessage(msg);
msg = NULL; // 防止重复销毁
}
catch (const std::runtime_error& e) {
std::cerr << "Runtime error during message sending: " << e.what() << std::endl;
// 如果消息已经创建,确保销毁它
if (msg != NULL) {
DestroyMessage(msg);
}
// 可以在这里添加更多的错误处理逻辑,比如重试、记录日志等
}
catch (const std::exception& e) {
std::cerr << "Exception during message sending: " << e.what() << std::endl;
if (msg != NULL) {
DestroyMessage(msg);
}
}
catch (...) {
std::cerr << "Unknown error during message sending." << std::endl;
if (msg != NULL) {
DestroyMessage(msg);
}
}
}*/
void sendMessage(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
{
try {
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << topic
<< ", tags: " << tags
<< ", keys: " << keys
<< ", body_len=" << body.size()
<< std::endl;
size_t n = std::min((size_t)200, body.size());
std::cout << "[MQ][BODY_HEAD] "
<< body.substr(0, n)
<< std::endl;
if (body.size() > n) {
std::cout << "[MQ][BODY_TAIL] "
<< body.substr(body.size() - n, n)
<< std::endl;
}
std::cout << "[MQ][HEX_HEAD] ";
for (size_t i = 0; i < std::min((size_t)100, body.size()); ++i) {
printf("%02X ", (unsigned char)body[i]);
}
printf("\n");
}
rocketmq::MQMessage msg(topic, tags, keys, body);
rocketmq::SendResult result = producer_.send(msg);
std::cout << "[MQ][SEND_OK]"
<< " topic=" << topic
<< ", tags=" << tags
<< ", keys=" << keys
<< ", msgId=" << result.getMsgId()
<< ", status=" << result.getSendStatus()
<< ", body_len=" << body.size()
<< std::endl;
}
catch (const rocketmq::MQClientException& e) {
std::cerr << "[MQ][SEND_FAIL] MQClientException: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq客户端错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (const std::exception& e) {
std::cerr << "[MQ][SEND_FAIL] exception: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq发送错误,发送请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (...) {
std::cerr << "[MQ][SEND_FAIL] unknown exception" << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,未知错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
2025-01-16 16:17:01 +08:00
}
// 析构函数中关闭并销毁生产者
~RocketMQProducer() {
/*if (producer_) {
2025-01-16 16:17:01 +08:00
ShutdownProducer(producer_);
DestroyProducer(producer_);
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
}*/
try {
producer_.shutdown();
2025-01-16 16:17:01 +08:00
}
catch (...) {
}
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
2025-01-16 16:17:01 +08:00
}
private:
//CProducer* producer_;
rocketmq::DefaultMQProducer producer_;
2025-01-16 16:17:01 +08:00
};
// 全局生产者实例
RocketMQProducer* g_producer = NULL;
// 初始化生产者(在程序启动时调用一次)
void InitializeProducer()
{
if (g_producer == NULL) {
try {
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);//生产者名称和NameServer地址
2025-01-16 16:17:01 +08:00
}
catch (const std::exception& e) {
std::cerr << "Failed to initialize producer: " << e.what() << std::endl;
// 根据需求决定是否终止程序或采取其他措施
throw; // 重新抛出异常
}
}
}
// 关闭并销毁生产者(在程序结束时调用一次)
void ShutdownAndDestroyProducer()
{
if (g_producer != NULL) {
delete g_producer;
g_producer = NULL;
}
}
// 发送消息的接口函数
void rocketmq_producer_send(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
2025-01-16 16:17:01 +08:00
{
2026-04-30 15:45:34 +08:00
std::call_once(g_producer_once, [&](){
InitializeProducer();
});
2025-01-16 16:17:01 +08:00
try {
g_producer->sendMessage(body, topic, tags, keys);
} catch (const std::exception& e) {
2025-01-16 16:17:01 +08:00
std::cerr << "Failed to send message: " << e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
2025-01-16 16:17:01 +08:00
}
}
#endif
//////////////////////////////////////////////////////////////////////////////////////////////////////////
/*
2025-01-16 16:17:01 +08:00
// producer_send0测试用
void StartSendMessage(CProducer* producer)
{
CSendResult result;
// create message and set some values for it
2026-04-30 15:45:34 +08:00
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
2025-01-16 16:17:01 +08:00
for (int i = 0; i < 10; i++)
{
// construct different body
string strMessageBody = "this is body number";
SetMessageBody(msg, strMessageBody.c_str());
// send message
SendMessageSync(producer, msg, &result);
cout << "send message[" << i << "], result status:" << result.sendStatus << ", msgBody:" << strMessageBody << endl;
usleep(1000);
}
// destroy message
DestroyMessage(msg);
}
//producer_send 测试用
void StartSendMessage(CProducer* producer,const char* strbody)
{
CSendResult result;
// create message and set some values for it
2026-04-30 15:45:34 +08:00
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
2025-01-16 16:17:01 +08:00
SetMessageBody(msg, strbody);
// send message
SendMessageSync(producer, msg, &result);
cout << "result status:" << result.sendStatus << ", msgBody:" << strbody << endl;
// destroy message
DestroyMessage(msg);
}
//测试用 固定消息体
void producer_send0()
{
cout << "Producer Initializing....." << endl;
// create producer and set some values for it
CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str());
SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str());
// start producer
StartProducer(producer);
cout << "Producer start....." << endl;
// send message
StartSendMessage(producer);
// shutdown producer
ShutdownProducer(producer);
// destroy producer
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
}
//测试用 可控制消息体
void producer_send(const char* strbody)
{
cout << "Producer Initializing....." << endl;
// create producer and set some values for it
CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str());
SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str());
// start producer
StartProducer(producer);
cout << "Producer start....." << endl;
// send message
StartSendMessage(producer, strbody);
// shutdown producer
ShutdownProducer(producer);
// destroy producer
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
}
*/
2025-01-16 16:17:01 +08:00
///////////////////////////////////////////////////////////////////////////////////////////////////////////
2025-02-18 17:10:22 +08:00
2025-01-16 16:17:01 +08:00
extern "C" {
2026-04-03 16:03:25 +08:00
//extern std::string G_MQCONSUMER_TOPIC_RT;
2025-01-16 16:17:01 +08:00
void rocketmq_test_rt()
{
Ckafka_data_t data;
data.monitor_id = 123123;
2026-04-30 15:45:34 +08:00
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RT);
2025-01-16 16:17:01 +08:00
std::ifstream file("rt.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
2026-04-30 16:06:29 +08:00
data.mp_id = QString::number(123456);
2025-01-16 16:17:01 +08:00
my_rocketmq_send(data);
}
2026-04-03 16:03:25 +08:00
//extern std::string G_MQCONSUMER_TOPIC_UD;
2025-01-16 16:17:01 +08:00
void rocketmq_test_ud()//用来测试台账更新
{
Ckafka_data_t data;
data.monitor_id = 123123;
2026-04-30 15:45:34 +08:00
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_UD);
2025-01-16 16:17:01 +08:00
std::ifstream file("ud.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
2026-04-30 16:06:29 +08:00
data.mp_id = QString::number(123456);
2025-01-16 16:17:01 +08:00
my_rocketmq_send(data);
}
void rocketmq_test_set()//用来测试进程控制脚本
{
Ckafka_data_t data;
data.monitor_id = 123123;
2026-04-30 15:45:34 +08:00
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET);
2025-01-16 16:17:01 +08:00
std::ifstream file("set.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
2026-04-30 16:06:29 +08:00
data.mp_id = QString::number(123456);
2025-01-16 16:17:01 +08:00
my_rocketmq_send(data);
}
2025-02-11 18:23:19 +08:00
void rocketmq_test_only()//用来测试进程控制脚本
{
Ckafka_data_t data;
data.monitor_id = 123123;
2026-04-30 15:45:34 +08:00
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_TEST);
std::ifstream file("test.txt"); // 文件中存储长字符串
2025-02-11 18:23:19 +08:00
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
2026-04-30 16:06:29 +08:00
data.mp_id = QString::number(123456);
2025-02-11 18:23:19 +08:00
my_rocketmq_send(data);
}
2026-04-03 16:03:25 +08:00
//extern std::string G_MQCONSUMER_TOPIC_RC;
2025-01-16 16:17:01 +08:00
void rocketmq_test_rc()
{
Ckafka_data_t data;
data.monitor_id = 123123;
2026-04-30 15:45:34 +08:00
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RC);
2025-01-16 16:17:01 +08:00
std::ifstream file("rc.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
2026-04-30 16:06:29 +08:00
data.mp_id = QString::number(123456);
2025-01-16 16:17:01 +08:00
my_rocketmq_send(data);
}
2026-04-03 16:03:25 +08:00
//extern std::string G_MQCONSUMER_TOPIC_LOG;
2025-02-26 16:39:10 +08:00
void rocketmq_test_log()
{
Ckafka_data_t data;
data.monitor_id = 123123;
2026-04-30 15:45:34 +08:00
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG);
2025-02-26 16:39:10 +08:00
std::ifstream file("log_test.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
2026-04-30 16:06:29 +08:00
data.mp_id = QString::number(123456);
2025-02-26 16:39:10 +08:00
my_rocketmq_send(data);
}
2025-01-16 16:17:01 +08:00
}
2025-02-18 17:10:22 +08:00