add log4cplus
This commit is contained in:
@@ -33,34 +33,141 @@ typedef enum E_CConsumeStatus { E_CONSUME_SUCCESS = 0, E_RECONSUME_LATER = 1 } C
|
||||
typedef int (*MessageCallBack)(CPushConsumer*, CMessageExt*);
|
||||
|
||||
ROCKETMQCLIENT_API CPushConsumer* CreatePushConsumer(const char* groupId);
|
||||
// 创建一个消费者实例并返回指向该实例的指针。
|
||||
// 参数:groupId - 消费者所属的消费者组ID。
|
||||
// 必要性:是必须的,创建消费者实例时必须提供消费者组ID。
|
||||
|
||||
ROCKETMQCLIENT_API int DestroyPushConsumer(CPushConsumer* consumer);
|
||||
// 销毁指定的消费者实例。
|
||||
// 参数:consumer - 需要销毁的消费者实例。
|
||||
// 必要性:在消费者不再需要时调用,以释放资源。
|
||||
|
||||
ROCKETMQCLIENT_API int StartPushConsumer(CPushConsumer* consumer);
|
||||
// 启动消费者,开始消费消息。
|
||||
// 参数:consumer - 要启动的消费者实例。
|
||||
// 必要性:启动消费者并开始接收消息是必须的。
|
||||
|
||||
ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer* consumer);
|
||||
// 关闭消费者,停止消费消息。
|
||||
// 参数:consumer - 要关闭的消费者实例。
|
||||
// 必要性:在应用程序结束时,必须调用该函数来安全地关闭消费者。
|
||||
|
||||
ROCKETMQCLIENT_API const char* ShowPushConsumerVersion(CPushConsumer* consumer);
|
||||
// 获取当前消费者实例的版本信息。
|
||||
// 参数:consumer - 需要获取版本信息的消费者实例。
|
||||
// 必要性:不是必须的,但可以用于调试和确认消费者版本。
|
||||
|
||||
ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId);
|
||||
// 设置消费者的消费者组ID。
|
||||
// 参数:consumer - 消费者实例;groupId - 消费者组ID。
|
||||
// 必要性:用于设置消费者的消费者组ID,必须设置。
|
||||
|
||||
ROCKETMQCLIENT_API const char* GetPushConsumerGroupID(CPushConsumer* consumer);
|
||||
// 获取消费者实例的消费者组ID。
|
||||
// 参数:consumer - 消费者实例。
|
||||
// 必要性:不是必须的,但可以用于检查当前的消费者组ID。
|
||||
|
||||
ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer* consumer, const char* namesrv);
|
||||
// 设置消费者的 NameServer 地址。
|
||||
// 参数:consumer - 消费者实例;namesrv - NameServer 地址。
|
||||
// 必要性:必须配置,以便消费者能够连接到 RocketMQ NameServer。
|
||||
|
||||
ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer* consumer, const char* domain);
|
||||
// 设置消费者的 NameServer 域名。
|
||||
// 参数:consumer - 消费者实例;domain - NameServer 域名。
|
||||
// 必要性:不是必须的,通常会使用 SetPushConsumerNameServerAddress 设置 IP 地址。
|
||||
|
||||
ROCKETMQCLIENT_API int Subscribe(CPushConsumer* consumer, const char* topic, const char* expression);
|
||||
// 订阅一个指定的 topic 和 tag。
|
||||
// 参数:consumer - 消费者实例;topic - 消息主题;expression - 消息标签(可以为空,表示订阅所有标签)。
|
||||
// 必要性:必须配置消费者要订阅的 topic。
|
||||
|
||||
ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer* consumer, MessageCallBack pCallback);
|
||||
// 注册顺序消息的回调函数。
|
||||
// 参数:consumer - 消费者实例;pCallback - 消息回调函数。
|
||||
// 必要性:如果需要顺序消费消息,则需要使用此函数注册顺序消息回调。
|
||||
|
||||
ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer* consumer, MessageCallBack pCallback);
|
||||
// 注册异步消息的回调函数。
|
||||
// 参数:consumer - 消费者实例;pCallback - 消息回调函数。
|
||||
// 必要性:必须配置回调函数,以便消费者能够处理收到的消息。
|
||||
|
||||
ROCKETMQCLIENT_API int UnregisterMessageCallbackOrderly(CPushConsumer* consumer);
|
||||
// 注销顺序消息的回调函数。
|
||||
// 参数:consumer - 消费者实例。
|
||||
// 必要性:不是必须的,只有在不再需要顺序消息时调用。
|
||||
|
||||
ROCKETMQCLIENT_API int UnregisterMessageCallback(CPushConsumer* consumer);
|
||||
// 注销异步消息的回调函数。
|
||||
// 参数:consumer - 消费者实例。
|
||||
// 必要性:不是必须的,只有在不再需要处理消息时调用。
|
||||
|
||||
ROCKETMQCLIENT_API int SetPushConsumerThreadCount(CPushConsumer* consumer, int threadCount);
|
||||
// 设置消费者的线程数量。
|
||||
// 参数:consumer - 消费者实例;threadCount - 消费者线程数量。
|
||||
// 必要性:用于控制消费者处理消息的并发度。如果不设置,默认值通常为1。
|
||||
|
||||
ROCKETMQCLIENT_API int SetPushConsumerMessageBatchMaxSize(CPushConsumer* consumer, int batchSize);
|
||||
// 设置消费者最大批量消息大小。
|
||||
// 参数:consumer - 消费者实例;batchSize - 最大批量消息大小。
|
||||
// 必要性:不是必须的,只有在需要调整消息消费的批量大小时才使用。
|
||||
|
||||
ROCKETMQCLIENT_API int SetPushConsumerInstanceName(CPushConsumer* consumer, const char* instanceName);
|
||||
// 设置消费者实例名称。
|
||||
// 参数:consumer - 消费者实例;instanceName - 消费者实例名称。
|
||||
// 必要性:不是必须的,可以为空,默认使用系统生成的实例名。
|
||||
|
||||
ROCKETMQCLIENT_API int SetPushConsumerSessionCredentials(CPushConsumer* consumer,
|
||||
const char* accessKey,
|
||||
const char* secretKey,
|
||||
const char* channel);
|
||||
// 设置消费者的身份验证信息。
|
||||
// 参数:consumer - 消费者实例;accessKey - 访问密钥;secretKey - 密钥;channel - 渠道。
|
||||
// 必要性:在需要身份验证的环境下(如阿里云RocketMQ),此项配置是必须的。
|
||||
|
||||
|
||||
// 设置消费者日志文件路径
|
||||
// 功能:设置日志文件的存储路径。消费者在运行时会将日志信息写入该路径指定的文件。
|
||||
// 必要性:日志路径配置有助于在生产环境中追踪和调试消费者的行为,记录日志是运维管理中的重要环节。
|
||||
ROCKETMQCLIENT_API int SetPushConsumerLogPath(CPushConsumer* consumer, const char* logPath);
|
||||
|
||||
|
||||
// 设置日志文件的数量和每个文件的最大大小
|
||||
// 功能:设置日志文件的数量和每个日志文件的最大大小。超过最大大小时会创建新日志文件。
|
||||
// 必要性:控制日志文件的数量和大小有助于防止日志文件过大,占用过多磁盘空间。对于高频消息消费的场景,合理配置日志文件大小和数量至关重要。
|
||||
ROCKETMQCLIENT_API int SetPushConsumerLogFileNumAndSize(CPushConsumer* consumer, int fileNum, long fileSize);
|
||||
|
||||
|
||||
// 设置消费者的日志级别
|
||||
// 功能:设置消费者的日志级别(如 DEBUG, INFO, WARN, ERROR 等),控制输出的日志详细程度。
|
||||
// 必要性:日志级别配置可以帮助调节日志的详细程度,根据环境不同选择适当的日志级别(例如,生产环境使用 ERROR 级别,开发环境使用 DEBUG 级别)。
|
||||
ROCKETMQCLIENT_API int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLevel level);
|
||||
|
||||
|
||||
// 设置消息模型(广播模式或集群模式)
|
||||
// 功能:设置消费者的消息消费模型。广播模式(`CMessageModel::BROADCASTING`)会将消息推送到所有消费者,而集群模式(`CMessageModel::CLUSTERING`)则只会将消息推送到一个消费者。
|
||||
// 必要性:选择消息模型对消息消费的行为有直接影响,集群模式适用于负载均衡,而广播模式适用于所有消费者都需要接收消息的场景。
|
||||
ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel);
|
||||
|
||||
|
||||
// 设置消费者最大缓存消息大小(以字节为单位)
|
||||
// 功能:设置消费者最大缓存消息的大小。当缓存达到此大小时,消费者会停止消费,直到缓存被处理并清空。
|
||||
// 必要性:此配置对于高吞吐量场景非常重要,它可以防止消费者在消息积压时出现内存溢出,保证消费者稳定运行。
|
||||
ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize);
|
||||
|
||||
|
||||
// 设置消费者最大缓存消息大小(以 MB 为单位)
|
||||
// 功能:与 `SetPushConsumerMaxCacheMessageSize` 类似,不过该参数以 MB 为单位,设置最大缓存消息的大小。
|
||||
// 必要性:提供 MB 级别的配置对于一些需要调整内存和消息缓存的应用场景,能够更加精细地控制内存使用情况。
|
||||
ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSizeInMb(CPushConsumer* consumer, int maxCacheSizeInMb);
|
||||
|
||||
|
||||
// 设置消息追踪模型
|
||||
// 功能:设置消息的追踪模式。开启追踪会记录消费者的消费信息,帮助开发者分析消息的流转和消费情况。
|
||||
// 必要性:对于消息的追踪和监控至关重要,尤其在故障排查和性能优化中,可以通过追踪记录分析消费者的消息处理状态。
|
||||
ROCKETMQCLIENT_API int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -1,15 +1,58 @@
|
||||
|
||||
#ifdef __cplusplus
|
||||
#include "../json/mms_json_inter.h"
|
||||
#include "../include/rocketmq/CProducer.h"
|
||||
#include "../include/rocketmq/CMessage.h"
|
||||
#include "../include/rocketmq/CSendResult.h"
|
||||
#include "../rocketmq/CProducer.h"
|
||||
#include "../rocketmq/CMessage.h"
|
||||
#include "../rocketmq/CSendResult.h"
|
||||
|
||||
#include "../rocketmq/CPushConsumer.h"
|
||||
|
||||
#include <vector>
|
||||
|
||||
|
||||
/*添加测试函数lnk10-10*/
|
||||
void producer_send0();
|
||||
void StartSendMessage(CProducer* producer);
|
||||
void StartSendMessage(CProducer* producer,const char* strbody);
|
||||
void producer_send(const char* strbody);
|
||||
void rocketmq_producer_send(const char* strbody,const char* topic);
|
||||
void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,std::string topic);
|
||||
void rocketmq_test();
|
||||
void my_rocketmq_send(Ckafka_data_t& data);
|
||||
void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic);
|
||||
extern "C" {
|
||||
void rocketmq_test_rt();
|
||||
void rocketmq_test_ud();
|
||||
void rocketmq_test_rc();
|
||||
void rocketmq_test_log();
|
||||
void rocketmq_test_set();
|
||||
void rocketmq_test_only();
|
||||
void rocketmq_test_300(int mpnum,int front_index);
|
||||
}
|
||||
//void rocketmq_test_300(int mpnum,int front_index);//20241202lnk
|
||||
extern void my_rocketmq_send(Ckafka_data_t& data);
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
|
||||
///////////////////////////////////////////////////////生产者
|
||||
void InitializeProducer();
|
||||
void ShutdownAndDestroyProducer();
|
||||
//////////////////////////////////////////////////////消费者
|
||||
void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key);
|
||||
void ShutdownAndDestroyConsumer();
|
||||
|
||||
struct Subscription {
|
||||
std::string topic;
|
||||
std::string tag;
|
||||
MessageCallBack callback;
|
||||
|
||||
Subscription(const std::string& t, const std::string& tg, MessageCallBack cb)
|
||||
: topic(t), tag(tg), callback(cb) {}
|
||||
};
|
||||
|
||||
void rocketmq_consumer_receive(
|
||||
const std::string& consumerName,
|
||||
const std::string& nameServer,
|
||||
//const std::string& topic,
|
||||
//const std::string& tag,
|
||||
//MessageCallBack callback);
|
||||
const std::vector<Subscription>& subscriptions);
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
#endif
|
||||
//////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user