#ifndef _FEKAFKA_PRODUCER_ #define _FEKAFKA_PRODUCER_ #include #include #include #include #include "rdkafkacpp.h" class FeKafkaProducer { public: FeKafkaProducer(); ~FeKafkaProducer(); public: bool init(const std::string &brokerlist, const bool &async = true, const int &size = 0x7fffffff); int send(const char *data, const int &size, const std::string &topic, const int &partition = 0, const std::string *key = NULL, const int &timeout = 0); int send_batch(const std::vector > &data, const std::string &topic, const int &partition = 0, const int &timeout = 0); bool create_topic(const std::string &topic); void close(); private: void read_config(const char *path); RdKafka::Topic* get_topic(const std::string &topic); private: RdKafka::Producer* producer_; RdKafka::Conf* conf_; RdKafka::Conf* tconf_; std::map topics_; }; #endif // _KAFKA_PRODUCER_