#include #include #include #include #include #include #include #include "kafka_producer.h" #define CONFIG_PATH "../etc" #ifndef WIN32 #define nullptr NULL #endif extern int SEND_FLAG; extern char* PROTOCOL; extern char* MECHANISMS; extern char* KEYTAB_FILE; extern char* SERVICE_NAME; extern char* PRINCIPAL; extern char* DOMAIN_NAME; class MyHashPartitionerCb : public RdKafka::PartitionerCb { public: int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque) { int n = atoi(key->c_str()); int part_id = (n % partition_cnt); printf("MyHashPartitionerCb key n=%d,partition_cnt=%d,partition_id=%d \n",n,partition_cnt,part_id); return part_id; } private: }; class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message& message) { if (message.err()) printf("Message delivery failed: topic=%s error=%s\n", message.topic_name().c_str(), message.errstr().c_str()); else printf("Message delivery sucess: topic=%s \n", message.topic_name().c_str()); } }; FeKafkaProducer::FeKafkaProducer() { } FeKafkaProducer::~FeKafkaProducer() { } bool FeKafkaProducer::init(const std::string &brokerlist, const bool &async, const int &size) { std::string errstr; this->topics_.clear(); #ifdef __GNUC__ conf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); tconf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)); #endif if (conf_ == nullptr || tconf_== nullptr) { return false; } #ifdef __GNUC__ static MyHashPartitionerCb hash_partitioner; if (tconf_->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) { std::cerr << errstr << std::endl; return false; } static ProducerDeliveryReportCb producerDELIVER; if (conf_->set("dr_cb", &producerDELIVER, errstr) != RdKafka::Conf::CONF_OK) { std::cerr << errstr << std::endl; return false; } #endif conf_->set("metadata.broker.list", brokerlist, errstr); if (strcmp(PROTOCOL, "sasl_plaintext") == 0) { conf_->set("security.protocol", PROTOCOL, errstr); conf_->set("sasl.mechanisms", MECHANISMS, errstr); std::string kinit = "/usr/bin/kinit -kt \""; kinit.append(KEYTAB_FILE); kinit.append("\" "); kinit.append(PRINCIPAL); conf_->set("sasl.kerberos.kinit.cmd", kinit, errstr); conf_->set("sasl.kerberos.keytab", KEYTAB_FILE, errstr); conf_->set("sasl.kerberos.service.name", SERVICE_NAME, errstr); conf_->set("sasl.kerberos.principal", PRINCIPAL, errstr); conf_->set("sasl.kerberos.domain.name", DOMAIN_NAME, errstr); printf("kafka sasl PROTOCOL=%s,MECHANISMS=%s,kinit=%s,KEYTAB_FILE=%s,SERVICE_NAME=%s,PRINCIPAL=%s,DOMAIN_NAME=%s \n", PROTOCOL, MECHANISMS, kinit.c_str(), KEYTAB_FILE, SERVICE_NAME, PRINCIPAL, DOMAIN_NAME); } if (async) { char size_str[256]; conf_->set("producer.type", "async", errstr); memset(size_str,0,256); apr_snprintf(size_str,sizeof(size_str),"%i",size); conf_->set("queue.buffering.max.messages", size_str, errstr); } else { conf_->set("producer.type", "sync", errstr); } #ifdef __GNUC__ producer_ = RdKafka::Producer::create(conf_, errstr); if (!producer_) { std::cerr << "Failed to create producer: " << errstr << std::endl; return false; } #endif return true; } int FeKafkaProducer::send(const char *data, const int &size, const std::string &topic, const int &partition, const std::string *key,const int &timeout) { RdKafka::Topic *tpk = get_topic(topic); if (tpk == nullptr) { printf("FIRST: get topic(%s) failed, to create at once \n",topic.c_str()); bool ret = create_topic(topic); if(ret) { printf("create topic OK \n"); tpk = get_topic(topic); if (tpk == nullptr) { printf("SECOND: get topic(%s) failed! but create topic seemed OK, what ??? ",topic.c_str()); return -1; } } else { printf("create topic Failed \n"); return -1; } } RdKafka::ErrorCode resp = producer_->produce(tpk, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(data), size, key->c_str(),key->size(), NULL); if (resp != RdKafka::ERR_NO_ERROR) { #ifdef __GNUC__ std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl; #endif return -1; } producer_->poll(timeout); return size; } int FeKafkaProducer::send_batch(const std::vector > &data, const std::string &topic, const int &partition, const int &timeout) { return 0; } bool FeKafkaProducer::create_topic(const std::string &topic) { std::string errstr; if (topic.empty() || producer_== nullptr || tconf_== nullptr) { printf("(topic.empty() || producer_== nullptr || tconf_== nullptr) \n"); return false; } if (this->get_topic(topic) != nullptr) { printf("if (this->get_topic(topic) != nullptr) \n"); return false; } #ifdef __GNUC__ RdKafka::Topic* tpk = (RdKafka::Topic::create(producer_, topic, tconf_, errstr)); if (!tpk) { std::cerr << "Failed to create topic: " << errstr << std::endl; return false; } topics_.insert(make_pair(topic, tpk)); #endif return true; } RdKafka::Topic *FeKafkaProducer::get_topic(const std::string &topic) { std::map::iterator it = topics_.find(topic); if(it != topics_.end()) { return it->second; } return nullptr; } void FeKafkaProducer::read_config(const char *path) { } void FeKafkaProducer::close() { while (producer_->outq_len() > 0) { std::cerr << "Waiting for " << producer_->outq_len() << std::endl; producer_->poll(1000); } #ifdef __GNUC__ RdKafka::wait_destroyed(1000); #endif }