Files
microser/cfg_parse/SimpleProducer.cpp
2026-04-30 15:45:34 +08:00

1052 lines
36 KiB
C++
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <fstream> // 用于 std::ifstream
#include <sstream> // 用于 std::stringstream
#include <unistd.h>
#include <stdlib.h>
#include <iostream>
#include <atomic>
#include <string>
#include "../mms/db_interface.h"
#include "../rocketmq/CProducer.h"
#include "../rocketmq/CMessage.h"
#include "../rocketmq/CSendResult.h"
#include "../rocketmq/SimpleProducer.h"
//测试300数据用lnk20241202
#include <ctime>
#include <QThread>
//测试300数据用
//lnk20241209添加队列选择
#include "../rocketmq/MQSelector.h"
#include "../rocketmq/MQMessageQueue.h"
//#include <atomic>
#include <vector>
#include <stdexcept>
//lnk20241209添加队列选择
#include <cstring>
//引入消费起点
#include "../rocketmq/DefaultMQPushConsumer.h"
#include "../rocketmq/ConsumeType.h"
#include "../rocketmq/MQMessageListener.h"
#include "../rocketmq/MQMessageExt.h"
#include "../rocketmq/SessionCredentials.h"
// 引入提供的消费者接口头文件
//#include "../rocketmq/CPushConsumer.h"
//#include "../rocketmq/CCommon.h"
//#include "../rocketmq/CMessageExt.h"
#include <map>
#include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex
#include <utility> // for std::pair
#include "../log4cplus/log4.h"//lnk添加log4
using namespace std;
static std::once_flag g_consumer_once;
static std::once_flag g_producer_once;
extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer
extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port
extern std::string G_ROCKETMQ_TOPIC_TEST;//topie
extern std::string G_ROCKETMQ_TAG_TEST;//tag
extern std::string G_ROCKETMQ_KEY_TEST;//key
extern std::string G_MQCONSUMER_TOPIC_LOG;
extern std::string G_MQCONSUMER_TOPIC_SET;
extern std::string G_MQCONSUMER_TOPIC_RC;
extern std::string G_MQCONSUMER_TOPIC_UD;
extern std::string G_MQCONSUMER_TOPIC_RT;
extern std::string G_MQCONSUMER_TOPIC_TEST;
extern std::string FRONT_INST;
extern bool DEBUGOPEN;
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
extern int QUEUENUM;
extern int g_front_seg_index;
extern char subdir[128];
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//消费者连接秘钥
extern std::string G_MQCONSUMER_ACCESSKEY;
extern std::string G_MQCONSUMER_SECRETKEY;
extern std::string G_MQCONSUMER_CHANNEL;
// 前向声明 RocketMQConsumer 类
class RocketMQConsumer;
// 全局映射CPushConsumer* -> RocketMQConsumer*
//std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
//pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
////////////////////////////////
int64_t G_APP_START_MS = []() -> int64_t {
using namespace std::chrono;
return duration_cast<milliseconds>(
system_clock::now().time_since_epoch()
).count();
}();
int64_t G_START_SKEW_MS = 1000;
bool should_process_after_start(const rocketmq::MQMessageExt& msg)
{
const int64_t born_ts = static_cast<int64_t>(msg.getBornTimestamp());
return born_ts >= (G_APP_START_MS - G_START_SKEW_MS);
}
///////////////////////////////
class InternalListener;
class RocketMQConsumer {
public:
// 构造函数:初始化消费者并启动
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer);
// 禁用拷贝和赋值
RocketMQConsumer(const RocketMQConsumer&) = delete;
RocketMQConsumer& operator=(const RocketMQConsumer&) = delete;
// 订阅主题和标签,并注册回调
void subscribe(const std::string& topic,
const std::string& tag,
MessageCallBack callback);
// 启动消费者
void start();
//修改消费模式
void setConsumerMessageModel(const std::string& topic);
rocketmq::ConsumeStatus handleMessage(const rocketmq::MQMessageExt& msg);
// 析构函数:关闭并销毁消费者
~RocketMQConsumer();
private:
//CPushConsumer* consumer_; // C 接口消费者指针
//MessageCallBack messageCallback_; // 函数指针用于回调
rocketmq::DefaultMQPushConsumer consumer_;
InternalListener* listener_;
std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射
/*// 静态消息处理回调
static int messageHandler(CPushConsumer* consumer, CMessageExt* msg);
// 实例消息处理函数
int handleMessage(CMessageExt* msg);*/
};
class InternalListener : public rocketmq::MessageListenerConcurrently {
public:
explicit InternalListener(RocketMQConsumer* owner)
: owner_(owner) {}
rocketmq::ConsumeStatus consumeMessage(
const std::vector<rocketmq::MQMessageExt>& msgs) override
{
for (size_t i = 0; i < msgs.size(); ++i) {
rocketmq::ConsumeStatus ret = owner_->handleMessage(msgs[i]);
if (ret != rocketmq::CONSUME_SUCCESS) {
return ret;
}
}
return rocketmq::CONSUME_SUCCESS;
}
private:
RocketMQConsumer* owner_;
};
// 构造函数实现C
/*RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
: consumer_(NULL)//, messageCallback_(NULL)
{
// 创建消费者
consumer_ = CreatePushConsumer(consumerName.c_str());
if (consumer_ == NULL) {
std::cout << "error CreatePushConsumer"<< std::endl;
throw std::runtime_error("Failed to create push consumer.");
}
SetPushConsumerSessionCredentials(consumer_,G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(),G_MQCONSUMER_CHANNEL.c_str());
// 设置 NameServer 地址
if (SetPushConsumerNameServerAddress(consumer_, nameServer.c_str()) != 0) {
DestroyPushConsumer(consumer_);
std::cout << "error setting nameServer"<< std::endl;
throw std::runtime_error("Failed to set NameServer address.");
}
// 设置消费者组ID
if (SetPushConsumerGroupID(consumer_, groupId.c_str()) != 0) {
DestroyPushConsumer(consumer_);
std::cout << "error setting groupId"<< std::endl;
throw std::runtime_error("Failed to set Consumer Group ID.");
}
//调试用
std::string consumerlog = "./mqconsumer/" + consumerName +".log";
if ( (SetPushConsumerLogPath(consumer_,consumerlog.c_str()) || SetPushConsumerLogFileNumAndSize(consumer_,10,100) || SetPushConsumerLogLevel(consumer_,E_LOG_LEVEL_DEBUG) ) != 0) {//记录消费日志
DestroyPushConsumer(consumer_);
std::cout << "error setting logpath"<< std::endl;
}
std::cout << "logpath:" << consumerlog << std::endl;
// 注册消息回调
if (RegisterMessageCallback(consumer_, RocketMQConsumer::messageHandler) != 0) {
DestroyPushConsumer(consumer_);
std::cout << "error setting Callback"<< std::endl;
throw std::runtime_error("Failed to register message callback.");
}
// 将消费者实例添加到全局映射
pthread_mutex_lock(&g_consumerMapMutex);
g_consumerMap[consumer_] = this;
pthread_mutex_unlock(&g_consumerMapMutex);
std::cout << "RocketMQ Consumer initialized and started." << std::endl;
}*/
//构造函数实现C++
RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup,
const std::string& nameServer)
: consumer_(consumerGroup),
listener_(NULL)
{
consumer_.setNamesrvAddr(nameServer);
consumer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
G_MQCONSUMER_CHANNEL
);
// 限制消费线程池,防止 ConsumeTP 爆炸
consumer_.setConsumeThreadCount(4);
listener_ = new InternalListener(this);
}
// 启动消费者
void RocketMQConsumer::start()
{
static bool started = false;
if (started) {
std::cout << "Consumer already started" << std::endl;
return;
}
consumer_.registerMessageListener(listener_);
consumer_.start();
started = true;
}
void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback)
{
/*if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
throw std::runtime_error("Failed to subscribe to topic/tag.");
}*/
consumer_.subscribe(topic, tag);
//调试用
std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
std::pair<std::string, std::string> mapKey(topic, tag);
callbacks_[mapKey] = callback;
}
/*
// 静态消息处理回调实现
int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
{
RocketMQConsumer* instance = NULL;
//调试用
std::cout << "messagehandler" << std::endl;
// 查找对应的消费者实例
pthread_mutex_lock(&g_consumerMapMutex);
std::map<CPushConsumer*, RocketMQConsumer*>::iterator it = g_consumerMap.find(consumer);
if (it != g_consumerMap.end()) {
instance = it->second;
}
pthread_mutex_unlock(&g_consumerMapMutex);
if (instance) {
return instance->handleMessage(msg);
} else {
std::cerr << "Consumer instance not found for callback." << std::endl;
//return E_RECONSUME_LATER; // 默认返回重试状态
return rocketmq::RECONSUME_LATER;
}
}
*/
/*int RocketMQConsumer::handleMessage(CMessageExt* msg)
{
// 检查 msg 和 consumer_ 是否为 NULL
if (!msg || !consumer_) {
std::cerr << "Received null message or consumer." << std::endl;
//return E_RECONSUME_LATER;
return rocketmq::RECONSUME_LATER;
}*/
// 获取消息的主题和标签
//std::string topic = GetMessageTopic(msg); // 假设存在此函数
//std::string tag = GetMessageTags(msg); // 假设存在此函数
rocketmq::ConsumeStatus RocketMQConsumer::handleMessage( const rocketmq::MQMessageExt& msg) {
std::string tag = msg.getTags();
std::string topic = msg.getTopic();
// 打印调试信息
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
// 使用 std::pair 作为键
std::pair<std::string, std::string> key(topic, tag);
// 查找对应的回调函数
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
if (it != callbacks_.end())
{ // 调用对应的回调函数
//调试
std::cout << "callback Handling message " <<std::endl;
//return it->second(consumer_, msg);
return it->second(msg);
}
else {
//调试
std::cout << "there is no callback " <<std::endl;
// 如果没有找到对应的回调,执行默认处理
//const char* body = GetMessageBody(msg);
//const char* msgKey = GetMessageKeys(msg);
std::string body = msg.getBody();
std::string msgKey = msg.getKeys();
if (!body.empty()) {
std::cout << "Received message body: " << body << std::endl;
} else {
std::cout << "Received message with empty body." << std::endl;
} if (!msgKey.empty()) {
std::cout << "Message Key: " << msgKey << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
} //return E_CONSUME_SUCCESS;
return rocketmq::CONSUME_SUCCESS;
}
}
// 析构函数实现
RocketMQConsumer::~RocketMQConsumer()
{
/*if (consumer_) {
// 关闭消费者
ShutdownPushConsumer(consumer_);
// 从全局映射中移除
pthread_mutex_lock(&g_consumerMapMutex);
g_consumerMap.erase(consumer_);
pthread_mutex_unlock(&g_consumerMapMutex);
// 销毁消费者
DestroyPushConsumer(consumer_);
consumer_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
}*/
try {
consumer_.shutdown();
} catch (...) {
}
sleep(1); // 等内部线程退出
delete listener_;
listener_ = NULL;
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
}
// 在 RocketMQConsumer 类中新增函数用来设置消费模式
void RocketMQConsumer::setConsumerMessageModel(const std::string& topic)
{
/*if (topic == G_MQCONSUMER_TOPIC_SET) {
// 设置为普通消费模式
if (SetPushConsumerMessageModel(consumer_, CLUSTERING) != 0) {
std::cout << "Error setting message model to CLUSTERING for topic: " << topic << std::endl;
} else {
std::cout << "Set consumer to CLUSTERING for topic: " << topic << std::endl;
}
} else*/ {
// 默认设置为广播消费模式
/*if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl;
} else {
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
}*/
consumer_.setMessageModel(rocketmq::BROADCASTING);
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
}
}
// 全局消费者实例
RocketMQConsumer* g_consumer = NULL;
// 初始化消费者函数
void InitializeConsumer(
const std::string& consumerName,
const std::string& nameServer,
const std::vector<Subscription>& subscriptions) // 接收多个订阅
{
if (g_consumer == NULL) {
std::cout << "create new consumer!" << std::endl;
try {
//g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组不同进程不同的消费者同时消费topic的同一条消息
g_consumer = new RocketMQConsumer(consumerName, nameServer);
for (size_t i = 0; i < subscriptions.size(); ++i) {
g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式
g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag,subscriptions[i].callback);
}
g_consumer->start();
}
catch (const std::exception& e) {
std::cerr << "Failed to initialize consumer: " << e.what() << std::endl;
throw; // 重新抛出异常
}
}
}
// 关闭并销毁消费者函数
void ShutdownAndDestroyConsumer()
{
if (g_consumer != NULL) {
delete g_consumer;
g_consumer = NULL;
}
}
// 消费消息的接口函数
void rocketmq_consumer_receive(
const std::string& consumerName,
const std::string& nameServer,
const std::vector<Subscription>& subscriptions) // 接收多个订阅
{
std::call_once(g_consumer_once, [&](){
try {
InitializeConsumer(consumerName, nameServer, subscriptions);
} catch (...) {
std::cerr << "Cannot consume message because consumer initialization failed." << std::endl;
}
});
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
//封装生产者类
#if 1
// 全局或静态变量,用于维护当前队列索引
static int currentQueueId = 0;
// 队列选择器回调函数:轮询选择队列 ID
/*int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
if (queueNum == 0) {
throw std::runtime_error("No available queues");
}
int queueId = currentQueueId % queueNum;
currentQueueId++;
if (currentQueueId >= 1024 - queueNum) {
currentQueueId = 0;
}
return queueId;
}*/
int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
static std::atomic<int> currentQueueId(0);
if (queueNum <= 0) {
std::cout << "[MQ][SELECTOR_FAIL] queueNum=" << queueNum << std::endl;
return -1;
}
int id = currentQueueId.fetch_add(1, std::memory_order_relaxed);
int queueId = id % queueNum;
std::cout << "[MQ][SELECTOR] queueNum=" << queueNum
<< ", current=" << id
<< ", selected=" << queueId
<< std::endl;
return queueId;
}
// 封装生产者的类
class RocketMQProducer {
public:
RocketMQProducer(const std::string& producerName, const std::string& nameServer)
: producer_(producerName)
{
// 创建生产者
/*producer_ = CreateProducer(producerName.c_str());
if (producer_ == NULL) {
throw std::runtime_error("Failed to create producer.");
}*/
// 设置日志
producer_.setLogLevel(rocketmq::eLOG_LEVEL_ERROR);
producer_.setLogFileSizeAndNum(5, 50);
// 设置 nameserver 地址
//SetProducerNameServerAddress(producer_, nameServer.c_str());
producer_.setNamesrvAddr(nameServer);
//lnk20260417设置数据上送消息体最大值默认4M调整为1M避免过大消息导致发送失败
//SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
producer_.setMaxMessageSize(1024 * 1024);
//SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
producer_.setSessionCredentials(
G_MQCONSUMER_ACCESSKEY,
G_MQCONSUMER_SECRETKEY,
""
);
// 启动生产者
//StartProducer(producer_);
producer_.start();
std::cout << "rocketmq_Producer initialized and started." << std::endl;
}
// 禁用拷贝和赋值
RocketMQProducer(const RocketMQProducer&) = delete;
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
/*void printSendResult(const CSendResult& result) {
std::cout << "SendResult:" << std::endl;
std::cout << " Status: ";
switch (result.sendStatus) {
case E_SEND_OK:
std::cout << "E_SEND_OK";
break;
case E_SEND_FLUSH_DISK_TIMEOUT:
std::cout << "E_SEND_FLUSH_DISK_TIMEOUT";
break;
case E_SEND_FLUSH_SLAVE_TIMEOUT:
std::cout << "E_SEND_FLUSH_SLAVE_TIMEOUT";
break;
case E_SEND_SLAVE_NOT_AVAILABLE:
std::cout << "E_SEND_SLAVE_NOT_AVAILABLE";
break;
default:
std::cout << "UNKNOWN(" << result.sendStatus << ")";
break;
}
std::cout << std::endl;
std::cout << " MsgID : " << result.msgId << std::endl;
std::cout << " Offset: " << result.offset << std::endl;
}*/
// 发送消息
/* void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL")
<< ", tags: " << tags
<< ", keys: " << keys
<< std::endl;
if (strbody) {
// ===== 1⃣ 真实长度 vs strlen =====
std::string body_str(strbody);
std::cout << "[MQ][LEN_CHECK]"
<< " strlen=" << strlen(strbody)
<< ", std::string.size=" << body_str.size()
<< std::endl;
// ===== 2⃣ 检测是否包含 \0 =====
bool has_null = false;
for (size_t i = 0; i < body_str.size(); i++) {
if (body_str[i] == '\0') {
has_null = true;
std::cout << "[MQ][FOUND_NULL] index=" << i << std::endl;
break;
}
}
std::cout << "[MQ][HAS_NULL] " << (has_null ? "YES" : "NO") << std::endl;
// ===== 3⃣ 打印头部(可读)=====
size_t len = strlen(strbody);
size_t n = std::min((size_t)200, len);
std::cout << "[MQ][BODY_HEAD] "
<< std::string(strbody, n)
<< std::endl;
std::cout << "[MQ][BODY_TAIL] "
<< std::string(strbody + (len - n), n)
<< std::endl;
// ===== 4⃣ 十六进制打印前100字节 =====
std::cout << "[MQ][HEX_HEAD] ";
for (size_t i = 0; i < std::min((size_t)100, body_str.size()); i++) {
printf("%02X ", (unsigned char)body_str[i]);
}
printf("\n");
// ===== 5⃣ 十六进制打印尾部100字节 =====
std::cout << "[MQ][HEX_TAIL] ";
size_t start = (body_str.size() > 100) ? body_str.size() - 100 : 0;
for (size_t i = start; i < body_str.size(); i++) {
printf("%02X ", (unsigned char)body_str[i]);
}
printf("\n");
} else {
std::cout << "[MQ][ERROR] strbody is NULL" << std::endl;
}
}
CSendResult result;
CMessage* msg = NULL;
try {
// 创建消息并设置属性
msg = CreateMessage(topic);
if (msg == NULL) {
throw std::runtime_error("Failed to create message.");
}
SetMessageTags(msg, tags.c_str());
SetMessageKeys(msg, keys.c_str());
SetMessageBody(msg, strbody);
// 假设队列数量和 Broker 名称是固定的
int queueNum = QUEUENUM; // 配置的队列数量例如5
// 发送消息
int sendResult = SendMessageOnewayOrderly(
producer_,
msg,
RoundRobinSelector, // 队列选择器回调函数
&queueNum // 传递给选择器的额外参数(队列数量)
);
/////////////////////////////////替换接口,性能较低但不影响
/*CSendResult result;
memset(&result, 0, sizeof(result));
int sendResult = SendMessageOrderly(
producer_,
msg,
RoundRobinSelector,
&queueNum,
0, // autoRetryTimes
&result
);
std::cout << "[MQ][ORDERLY_RESULT]"
<< " ret=" << sendResult
<< ", sendStatus=" << (int)result.sendStatus
<< ", msgId=" << result.msgId
<< ", offset=" << result.offset
<< ", topic=" << (topic ? topic : "")
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;*/
/////////////////////////////////替换接口,性能较低但不影响
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
/*CSendResult result;
memset(&result, 0, sizeof(result));
int sendResult = SendMessageSync(
producer_,
msg,
&result
);
std::cout << "[MQ][SYNC_RESULT]"
<< " ret=" << sendResult
<< ", sendStatus=" << (int)result.sendStatus
<< ", msgId=" << result.msgId
<< ", offset=" << result.offset
<< ", topic=" << (topic ? topic : "")
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;*/
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
/*if (sendResult == 0) { // 假设返回 0 表示成功
std::cout << "[MQ][SEND_OK]"
<< " topic=" << (topic ? topic : "")
<< ", tags=" << tags
<< ", keys=" << keys
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;
} else {
std::cout << "[MQ][SEND_FAIL]"
<< " ret=" << sendResult
<< ", topic=" << (topic ? topic : "")
<< ", tags=" << tags
<< ", keys=" << keys
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;
std::cout << "[MQ][BODY_HEAD] " << std::string(strbody, std::min((size_t)200, strlen(strbody))) << std::endl;
std::cout << "[MQ][BODY_TAIL] " << std::string(strbody + std::max((size_t)0, strlen(strbody) - std::min((size_t)200, strlen(strbody)))) << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
}
// 销毁消息
DestroyMessage(msg);
msg = NULL; // 防止重复销毁
}
catch (const std::runtime_error& e) {
std::cerr << "Runtime error during message sending: " << e.what() << std::endl;
// 如果消息已经创建,确保销毁它
if (msg != NULL) {
DestroyMessage(msg);
}
// 可以在这里添加更多的错误处理逻辑,比如重试、记录日志等
}
catch (const std::exception& e) {
std::cerr << "Exception during message sending: " << e.what() << std::endl;
if (msg != NULL) {
DestroyMessage(msg);
}
}
catch (...) {
std::cerr << "Unknown error during message sending." << std::endl;
if (msg != NULL) {
DestroyMessage(msg);
}
}
}*/
void sendMessage(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
{
try {
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << topic
<< ", tags: " << tags
<< ", keys: " << keys
<< ", body_len=" << body.size()
<< std::endl;
size_t n = std::min((size_t)200, body.size());
std::cout << "[MQ][BODY_HEAD] "
<< body.substr(0, n)
<< std::endl;
if (body.size() > n) {
std::cout << "[MQ][BODY_TAIL] "
<< body.substr(body.size() - n, n)
<< std::endl;
}
std::cout << "[MQ][HEX_HEAD] ";
for (size_t i = 0; i < std::min((size_t)100, body.size()); ++i) {
printf("%02X ", (unsigned char)body[i]);
}
printf("\n");
}
rocketmq::MQMessage msg(topic, tags, keys, body);
rocketmq::SendResult result = producer_.send(msg);
std::cout << "[MQ][SEND_OK]"
<< " topic=" << topic
<< ", tags=" << tags
<< ", keys=" << keys
<< ", msgId=" << result.getMsgId()
<< ", status=" << result.getSendStatus()
<< ", body_len=" << body.size()
<< std::endl;
}
catch (const rocketmq::MQClientException& e) {
std::cerr << "[MQ][SEND_FAIL] MQClientException: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq客户端错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (const std::exception& e) {
std::cerr << "[MQ][SEND_FAIL] exception: "
<< e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,mq发送错误,发送请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
catch (...) {
std::cerr << "[MQ][SEND_FAIL] unknown exception" << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,未知错误,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
}
// 析构函数中关闭并销毁生产者
~RocketMQProducer() {
/*if (producer_) {
ShutdownProducer(producer_);
DestroyProducer(producer_);
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
}*/
try {
producer_.shutdown();
}
catch (...) {
}
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
}
private:
//CProducer* producer_;
rocketmq::DefaultMQProducer producer_;
};
// 全局生产者实例
RocketMQProducer* g_producer = NULL;
// 初始化生产者(在程序启动时调用一次)
void InitializeProducer()
{
if (g_producer == NULL) {
try {
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);//生产者名称和NameServer地址
}
catch (const std::exception& e) {
std::cerr << "Failed to initialize producer: " << e.what() << std::endl;
// 根据需求决定是否终止程序或采取其他措施
throw; // 重新抛出异常
}
}
}
// 关闭并销毁生产者(在程序结束时调用一次)
void ShutdownAndDestroyProducer()
{
if (g_producer != NULL) {
delete g_producer;
g_producer = NULL;
}
}
// 发送消息的接口函数
void rocketmq_producer_send(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys)
{
std::call_once(g_producer_once, [&](){
InitializeProducer();
});
try {
g_producer->sendMessage(body, topic, tags, keys);
} catch (const std::exception& e) {
std::cerr << "Failed to send message: " << e.what() << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置",
get_front_msg_from_subdir(), g_front_seg_index);
}
}
#endif
//////////////////////////////////////////////////////////////////////////////////////////////////////////
/*
// producer_send0测试用
void StartSendMessage(CProducer* producer)
{
CSendResult result;
// create message and set some values for it
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
for (int i = 0; i < 10; i++)
{
// construct different body
string strMessageBody = "this is body number";
SetMessageBody(msg, strMessageBody.c_str());
// send message
SendMessageSync(producer, msg, &result);
cout << "send message[" << i << "], result status:" << result.sendStatus << ", msgBody:" << strMessageBody << endl;
usleep(1000);
}
// destroy message
DestroyMessage(msg);
}
//producer_send 测试用
void StartSendMessage(CProducer* producer,const char* strbody)
{
CSendResult result;
// create message and set some values for it
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
SetMessageBody(msg, strbody);
// send message
SendMessageSync(producer, msg, &result);
cout << "result status:" << result.sendStatus << ", msgBody:" << strbody << endl;
// destroy message
DestroyMessage(msg);
}
//测试用 固定消息体
void producer_send0()
{
cout << "Producer Initializing....." << endl;
// create producer and set some values for it
CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str());
SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str());
// start producer
StartProducer(producer);
cout << "Producer start....." << endl;
// send message
StartSendMessage(producer);
// shutdown producer
ShutdownProducer(producer);
// destroy producer
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
}
//测试用 可控制消息体
void producer_send(const char* strbody)
{
cout << "Producer Initializing....." << endl;
// create producer and set some values for it
CProducer* producer = CreateProducer(G_ROCKETMQ_PRODUCER.c_str());
SetProducerNameServerAddress(producer, G_ROCKETMQ_IPPORT.c_str());
// start producer
StartProducer(producer);
cout << "Producer start....." << endl;
// send message
StartSendMessage(producer, strbody);
// shutdown producer
ShutdownProducer(producer);
// destroy producer
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
}
*/
///////////////////////////////////////////////////////////////////////////////////////////////////////////
extern "C" {
//extern std::string G_MQCONSUMER_TOPIC_RT;
void rocketmq_test_rt()
{
Ckafka_data_t data;
data.monitor_id = 123123;
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RT);
std::ifstream file("rt.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123;
my_rocketmq_send(data);
}
//extern std::string G_MQCONSUMER_TOPIC_UD;
void rocketmq_test_ud()//用来测试台账更新
{
Ckafka_data_t data;
data.monitor_id = 123123;
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_UD);
std::ifstream file("ud.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123;
my_rocketmq_send(data);
}
void rocketmq_test_set()//用来测试进程控制脚本
{
Ckafka_data_t data;
data.monitor_id = 123123;
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET);
std::ifstream file("set.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123;
my_rocketmq_send(data);
}
void rocketmq_test_only()//用来测试进程控制脚本
{
Ckafka_data_t data;
data.monitor_id = 123123;
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_TEST);
std::ifstream file("test.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123;
my_rocketmq_send(data);
}
//extern std::string G_MQCONSUMER_TOPIC_RC;
void rocketmq_test_rc()
{
Ckafka_data_t data;
data.monitor_id = 123123;
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RC);
std::ifstream file("rc.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123;
my_rocketmq_send(data);
}
//extern std::string G_MQCONSUMER_TOPIC_LOG;
void rocketmq_test_log()
{
Ckafka_data_t data;
data.monitor_id = 123123;
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG);
std::ifstream file("log_test.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123;
my_rocketmq_send(data);
}
}