lnk commit front code
This commit is contained in:
249
json/kafka_producer.cpp
Normal file
249
json/kafka_producer.cpp
Normal file
@@ -0,0 +1,249 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <cstdlib>
|
||||
#include <cstdio>
|
||||
#include <csignal>
|
||||
#include <cstring>
|
||||
#include <apr_strings.h>
|
||||
|
||||
#include "kafka_producer.h"
|
||||
|
||||
#define CONFIG_PATH "../etc"
|
||||
|
||||
#ifndef WIN32
|
||||
#define nullptr NULL
|
||||
#endif
|
||||
extern int SEND_FLAG;
|
||||
extern char* PROTOCOL;
|
||||
extern char* MECHANISMS;
|
||||
extern char* KEYTAB_FILE;
|
||||
extern char* SERVICE_NAME;
|
||||
extern char* PRINCIPAL;
|
||||
extern char* DOMAIN_NAME;
|
||||
|
||||
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
|
||||
public:
|
||||
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
|
||||
int32_t partition_cnt, void *msg_opaque) {
|
||||
int n = atoi(key->c_str());
|
||||
int part_id = (n % partition_cnt);
|
||||
printf("MyHashPartitionerCb key n=%d,partition_cnt=%d,partition_id=%d \n",n,partition_cnt,part_id);
|
||||
return part_id;
|
||||
}
|
||||
private:
|
||||
|
||||
//static inline unsigned int djb_hash (const char *str, size_t len) {
|
||||
// unsigned int hash = 5381;
|
||||
// for (size_t i = 0 ; i < len ; i++)
|
||||
// hash = ((hash << 5) + hash) + str[i];
|
||||
// return hash;
|
||||
//}
|
||||
};
|
||||
|
||||
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
|
||||
public: void dr_cb(RdKafka::Message& message) {
|
||||
if (message.err())
|
||||
printf("Message delivery failed: topic=%s error=%s\n", message.topic_name().c_str(), message.errstr().c_str());
|
||||
else
|
||||
printf("Message delivery sucess: topic=%s \n", message.topic_name().c_str());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
FeKafkaProducer::FeKafkaProducer()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
FeKafkaProducer::~FeKafkaProducer()
|
||||
{
|
||||
}
|
||||
|
||||
bool FeKafkaProducer::init(const std::string &brokerlist, const bool &async, const int &size)
|
||||
{
|
||||
std::string errstr;
|
||||
this->topics_.clear();
|
||||
|
||||
#ifdef __GNUC__
|
||||
conf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
|
||||
tconf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
|
||||
#endif
|
||||
|
||||
if (conf_ == nullptr || tconf_== nullptr)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef __GNUC__
|
||||
static MyHashPartitionerCb hash_partitioner;
|
||||
if (tconf_->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) {
|
||||
std::cerr << errstr << std::endl;
|
||||
return false;
|
||||
}
|
||||
static ProducerDeliveryReportCb producerDELIVER;
|
||||
if (conf_->set("dr_cb", &producerDELIVER, errstr) != RdKafka::Conf::CONF_OK) {
|
||||
std::cerr << errstr << std::endl;
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
//std::string broker(host);
|
||||
//broker.append(":").append(std::to_string((long long)port));
|
||||
conf_->set("metadata.broker.list", brokerlist, errstr);
|
||||
if (strcmp(PROTOCOL, "sasl_plaintext") == 0) {
|
||||
conf_->set("security.protocol", PROTOCOL, errstr);
|
||||
conf_->set("sasl.mechanisms", MECHANISMS, errstr);
|
||||
std::string kinit = "/usr/bin/kinit -kt \"";
|
||||
kinit.append(KEYTAB_FILE);
|
||||
kinit.append("\" ");
|
||||
kinit.append(PRINCIPAL);
|
||||
|
||||
conf_->set("sasl.kerberos.kinit.cmd", kinit, errstr);
|
||||
conf_->set("sasl.kerberos.keytab", KEYTAB_FILE, errstr);
|
||||
conf_->set("sasl.kerberos.service.name", SERVICE_NAME, errstr);
|
||||
conf_->set("sasl.kerberos.principal", PRINCIPAL, errstr);
|
||||
conf_->set("sasl.kerberos.domain.name", DOMAIN_NAME, errstr);
|
||||
printf("kafka sasl PROTOCOL=%s,MECHANISMS=%s,kinit=%s,KEYTAB_FILE=%s,SERVICE_NAME=%s,PRINCIPAL=%s,DOMAIN_NAME=%s \n", PROTOCOL, MECHANISMS, kinit.c_str(), KEYTAB_FILE, SERVICE_NAME, PRINCIPAL, DOMAIN_NAME);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
if (async)
|
||||
{
|
||||
char size_str[256];
|
||||
conf_->set("producer.type", "async", errstr);
|
||||
//conf_->set("queue.buffering.max.messages", std::to_string((long long)size).c_str(), errstr);
|
||||
memset(size_str,0,256);
|
||||
apr_snprintf(size_str,sizeof(size_str),"%i",size);
|
||||
conf_->set("queue.buffering.max.messages", size_str, errstr);
|
||||
|
||||
{
|
||||
//const char* api_version_request = "false";
|
||||
//const char* api_version_fallback = "0.8.2.0";
|
||||
//conf_->set("api.version.request", api_version_request, errstr) ;
|
||||
//conf_->set("broker.version.fallback", api_version_fallback, errstr);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
conf_->set("producer.type", "sync", errstr);
|
||||
}
|
||||
|
||||
#ifdef __GNUC__
|
||||
producer_ = RdKafka::Producer::create(conf_, errstr);
|
||||
if (!producer_)
|
||||
{
|
||||
std::cerr << "Failed to create producer: " << errstr << std::endl;
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
int FeKafkaProducer::send(const char *data, const int &size, const std::string &topic,
|
||||
const int &partition, const std::string *key,const int &timeout)
|
||||
{
|
||||
RdKafka::Topic *tpk = get_topic(topic);
|
||||
//std::cout<<"send data "<<data<<std::endl;
|
||||
if (tpk == nullptr) {
|
||||
printf("FIRST: get topic(%s) failed, to create at once \n",topic.c_str());
|
||||
bool ret = create_topic(topic);
|
||||
if(ret) {
|
||||
printf("create topic OK \n");
|
||||
tpk = get_topic(topic);
|
||||
if (tpk == nullptr) {
|
||||
printf("SECOND: get topic(%s) failed! but create topic seemed OK, what ??? ",topic.c_str());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("create topic Failed \n");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
RdKafka::ErrorCode resp =
|
||||
producer_->produce(tpk, partition,
|
||||
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
|
||||
const_cast<char*>(data), size,
|
||||
key->c_str(),key->size(), NULL);
|
||||
///*NULL*/ key, NULL);
|
||||
|
||||
if (resp != RdKafka::ERR_NO_ERROR)
|
||||
{
|
||||
#ifdef __GNUC__
|
||||
std::cerr << "% Produce failed: " <<
|
||||
RdKafka::err2str(resp) << std::endl;
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
producer_->poll(timeout);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
int FeKafkaProducer::send_batch(const std::vector<std::pair<const char *, const int &> > &data,
|
||||
const std::string &topic, const int &partition, const int &timeout)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool FeKafkaProducer::create_topic(const std::string &topic)
|
||||
{
|
||||
std::string errstr;
|
||||
if (topic.empty() || producer_== nullptr || tconf_== nullptr)
|
||||
{
|
||||
printf("(topic.empty() || producer_== nullptr || tconf_== nullptr) \n");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this->get_topic(topic) != nullptr)
|
||||
{
|
||||
printf("if (this->get_topic(topic) != nullptr) \n");
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef __GNUC__
|
||||
RdKafka::Topic* tpk = (RdKafka::Topic::create(producer_, topic, tconf_, errstr));
|
||||
if (!tpk)
|
||||
{
|
||||
std::cerr << "Failed to create topic: " << errstr << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
topics_.insert(make_pair(topic, tpk));
|
||||
#endif
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
RdKafka::Topic *FeKafkaProducer::get_topic(const std::string &topic)
|
||||
{
|
||||
std::map<std::string, RdKafka::Topic*>::iterator it = topics_.find(topic);
|
||||
if(it != topics_.end())
|
||||
{
|
||||
return it->second;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void FeKafkaProducer::read_config(const char *path)
|
||||
{
|
||||
}
|
||||
|
||||
void FeKafkaProducer::close()
|
||||
{
|
||||
while (producer_->outq_len() > 0)
|
||||
{
|
||||
std::cerr << "Waiting for " << producer_->outq_len() << std::endl;
|
||||
producer_->poll(1000);
|
||||
}
|
||||
|
||||
#ifdef __GNUC__
|
||||
RdKafka::wait_destroyed(1000);
|
||||
#endif
|
||||
}
|
||||
Reference in New Issue
Block a user