250 lines
6.3 KiB
C++
250 lines
6.3 KiB
C++
|
|
#include <iostream>
|
|
#include <string>
|
|
#include <cstdlib>
|
|
#include <cstdio>
|
|
#include <csignal>
|
|
#include <cstring>
|
|
#include <apr_strings.h>
|
|
|
|
#include "kafka_producer.h"
|
|
|
|
#define CONFIG_PATH "../etc"
|
|
|
|
#ifndef WIN32
|
|
#define nullptr NULL
|
|
#endif
|
|
extern int SEND_FLAG;
|
|
extern char* PROTOCOL;
|
|
extern char* MECHANISMS;
|
|
extern char* KEYTAB_FILE;
|
|
extern char* SERVICE_NAME;
|
|
extern char* PRINCIPAL;
|
|
extern char* DOMAIN_NAME;
|
|
|
|
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
|
|
public:
|
|
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
|
|
int32_t partition_cnt, void *msg_opaque) {
|
|
int n = atoi(key->c_str());
|
|
int part_id = (n % partition_cnt);
|
|
printf("MyHashPartitionerCb key n=%d,partition_cnt=%d,partition_id=%d \n",n,partition_cnt,part_id);
|
|
return part_id;
|
|
}
|
|
private:
|
|
|
|
//static inline unsigned int djb_hash (const char *str, size_t len) {
|
|
// unsigned int hash = 5381;
|
|
// for (size_t i = 0 ; i < len ; i++)
|
|
// hash = ((hash << 5) + hash) + str[i];
|
|
// return hash;
|
|
//}
|
|
};
|
|
|
|
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
|
|
public: void dr_cb(RdKafka::Message& message) {
|
|
if (message.err())
|
|
printf("Message delivery failed: topic=%s error=%s\n", message.topic_name().c_str(), message.errstr().c_str());
|
|
else
|
|
printf("Message delivery sucess: topic=%s \n", message.topic_name().c_str());
|
|
}
|
|
};
|
|
|
|
|
|
FeKafkaProducer::FeKafkaProducer()
|
|
{
|
|
|
|
}
|
|
|
|
FeKafkaProducer::~FeKafkaProducer()
|
|
{
|
|
}
|
|
|
|
bool FeKafkaProducer::init(const std::string &brokerlist, const bool &async, const int &size)
|
|
{
|
|
std::string errstr;
|
|
this->topics_.clear();
|
|
|
|
#ifdef __GNUC__
|
|
conf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
|
|
tconf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
|
|
#endif
|
|
|
|
if (conf_ == nullptr || tconf_== nullptr)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
#ifdef __GNUC__
|
|
static MyHashPartitionerCb hash_partitioner;
|
|
if (tconf_->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) {
|
|
std::cerr << errstr << std::endl;
|
|
return false;
|
|
}
|
|
static ProducerDeliveryReportCb producerDELIVER;
|
|
if (conf_->set("dr_cb", &producerDELIVER, errstr) != RdKafka::Conf::CONF_OK) {
|
|
std::cerr << errstr << std::endl;
|
|
return false;
|
|
}
|
|
#endif
|
|
|
|
//std::string broker(host);
|
|
//broker.append(":").append(std::to_string((long long)port));
|
|
conf_->set("metadata.broker.list", brokerlist, errstr);
|
|
if (strcmp(PROTOCOL, "sasl_plaintext") == 0) {
|
|
conf_->set("security.protocol", PROTOCOL, errstr);
|
|
conf_->set("sasl.mechanisms", MECHANISMS, errstr);
|
|
std::string kinit = "/usr/bin/kinit -kt \"";
|
|
kinit.append(KEYTAB_FILE);
|
|
kinit.append("\" ");
|
|
kinit.append(PRINCIPAL);
|
|
|
|
conf_->set("sasl.kerberos.kinit.cmd", kinit, errstr);
|
|
conf_->set("sasl.kerberos.keytab", KEYTAB_FILE, errstr);
|
|
conf_->set("sasl.kerberos.service.name", SERVICE_NAME, errstr);
|
|
conf_->set("sasl.kerberos.principal", PRINCIPAL, errstr);
|
|
conf_->set("sasl.kerberos.domain.name", DOMAIN_NAME, errstr);
|
|
printf("kafka sasl PROTOCOL=%s,MECHANISMS=%s,kinit=%s,KEYTAB_FILE=%s,SERVICE_NAME=%s,PRINCIPAL=%s,DOMAIN_NAME=%s \n", PROTOCOL, MECHANISMS, kinit.c_str(), KEYTAB_FILE, SERVICE_NAME, PRINCIPAL, DOMAIN_NAME);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (async)
|
|
{
|
|
char size_str[256];
|
|
conf_->set("producer.type", "async", errstr);
|
|
//conf_->set("queue.buffering.max.messages", std::to_string((long long)size).c_str(), errstr);
|
|
memset(size_str,0,256);
|
|
apr_snprintf(size_str,sizeof(size_str),"%i",size);
|
|
conf_->set("queue.buffering.max.messages", size_str, errstr);
|
|
|
|
{
|
|
//const char* api_version_request = "false";
|
|
//const char* api_version_fallback = "0.8.2.0";
|
|
//conf_->set("api.version.request", api_version_request, errstr) ;
|
|
//conf_->set("broker.version.fallback", api_version_fallback, errstr);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
conf_->set("producer.type", "sync", errstr);
|
|
}
|
|
|
|
#ifdef __GNUC__
|
|
producer_ = RdKafka::Producer::create(conf_, errstr);
|
|
if (!producer_)
|
|
{
|
|
std::cerr << "Failed to create producer: " << errstr << std::endl;
|
|
return false;
|
|
}
|
|
#endif
|
|
return true;
|
|
}
|
|
|
|
int FeKafkaProducer::send(const char *data, const int &size, const std::string &topic,
|
|
const int &partition, const std::string *key,const int &timeout)
|
|
{
|
|
RdKafka::Topic *tpk = get_topic(topic);
|
|
//std::cout<<"send data "<<data<<std::endl;
|
|
if (tpk == nullptr) {
|
|
printf("FIRST: get topic(%s) failed, to create at once \n",topic.c_str());
|
|
bool ret = create_topic(topic);
|
|
if(ret) {
|
|
printf("create topic OK \n");
|
|
tpk = get_topic(topic);
|
|
if (tpk == nullptr) {
|
|
printf("SECOND: get topic(%s) failed! but create topic seemed OK, what ??? ",topic.c_str());
|
|
return -1;
|
|
}
|
|
}
|
|
else {
|
|
printf("create topic Failed \n");
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
RdKafka::ErrorCode resp =
|
|
producer_->produce(tpk, partition,
|
|
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
|
|
const_cast<char*>(data), size,
|
|
key->c_str(),key->size(), NULL);
|
|
///*NULL*/ key, NULL);
|
|
|
|
if (resp != RdKafka::ERR_NO_ERROR)
|
|
{
|
|
#ifdef __GNUC__
|
|
std::cerr << "% Produce failed: " <<
|
|
RdKafka::err2str(resp) << std::endl;
|
|
#endif
|
|
return -1;
|
|
}
|
|
|
|
producer_->poll(timeout);
|
|
|
|
return size;
|
|
}
|
|
|
|
int FeKafkaProducer::send_batch(const std::vector<std::pair<const char *, const int &> > &data,
|
|
const std::string &topic, const int &partition, const int &timeout)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
bool FeKafkaProducer::create_topic(const std::string &topic)
|
|
{
|
|
std::string errstr;
|
|
if (topic.empty() || producer_== nullptr || tconf_== nullptr)
|
|
{
|
|
printf("(topic.empty() || producer_== nullptr || tconf_== nullptr) \n");
|
|
return false;
|
|
}
|
|
|
|
if (this->get_topic(topic) != nullptr)
|
|
{
|
|
printf("if (this->get_topic(topic) != nullptr) \n");
|
|
return false;
|
|
}
|
|
|
|
#ifdef __GNUC__
|
|
RdKafka::Topic* tpk = (RdKafka::Topic::create(producer_, topic, tconf_, errstr));
|
|
if (!tpk)
|
|
{
|
|
std::cerr << "Failed to create topic: " << errstr << std::endl;
|
|
return false;
|
|
}
|
|
|
|
topics_.insert(make_pair(topic, tpk));
|
|
#endif
|
|
|
|
return true;
|
|
}
|
|
|
|
RdKafka::Topic *FeKafkaProducer::get_topic(const std::string &topic)
|
|
{
|
|
std::map<std::string, RdKafka::Topic*>::iterator it = topics_.find(topic);
|
|
if(it != topics_.end())
|
|
{
|
|
return it->second;
|
|
}
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
void FeKafkaProducer::read_config(const char *path)
|
|
{
|
|
}
|
|
|
|
void FeKafkaProducer::close()
|
|
{
|
|
while (producer_->outq_len() > 0)
|
|
{
|
|
std::cerr << "Waiting for " << producer_->outq_len() << std::endl;
|
|
producer_->poll(1000);
|
|
}
|
|
|
|
#ifdef __GNUC__
|
|
RdKafka::wait_destroyed(1000);
|
|
#endif
|
|
}
|