Files
microser/json/kafka_producer.cpp
2025-01-16 16:17:01 +08:00

250 lines
6.3 KiB
C++

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <apr_strings.h>
#include "kafka_producer.h"
#define CONFIG_PATH "../etc"
#ifndef WIN32
#define nullptr NULL
#endif
extern int SEND_FLAG;
extern char* PROTOCOL;
extern char* MECHANISMS;
extern char* KEYTAB_FILE;
extern char* SERVICE_NAME;
extern char* PRINCIPAL;
extern char* DOMAIN_NAME;
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque) {
int n = atoi(key->c_str());
int part_id = (n % partition_cnt);
printf("MyHashPartitionerCb key n=%d,partition_cnt=%d,partition_id=%d \n",n,partition_cnt,part_id);
return part_id;
}
private:
//static inline unsigned int djb_hash (const char *str, size_t len) {
// unsigned int hash = 5381;
// for (size_t i = 0 ; i < len ; i++)
// hash = ((hash << 5) + hash) + str[i];
// return hash;
//}
};
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public: void dr_cb(RdKafka::Message& message) {
if (message.err())
printf("Message delivery failed: topic=%s error=%s\n", message.topic_name().c_str(), message.errstr().c_str());
else
printf("Message delivery sucess: topic=%s \n", message.topic_name().c_str());
}
};
FeKafkaProducer::FeKafkaProducer()
{
}
FeKafkaProducer::~FeKafkaProducer()
{
}
bool FeKafkaProducer::init(const std::string &brokerlist, const bool &async, const int &size)
{
std::string errstr;
this->topics_.clear();
#ifdef __GNUC__
conf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
tconf_ = (RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
#endif
if (conf_ == nullptr || tconf_== nullptr)
{
return false;
}
#ifdef __GNUC__
static MyHashPartitionerCb hash_partitioner;
if (tconf_->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
return false;
}
static ProducerDeliveryReportCb producerDELIVER;
if (conf_->set("dr_cb", &producerDELIVER, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
return false;
}
#endif
//std::string broker(host);
//broker.append(":").append(std::to_string((long long)port));
conf_->set("metadata.broker.list", brokerlist, errstr);
if (strcmp(PROTOCOL, "sasl_plaintext") == 0) {
conf_->set("security.protocol", PROTOCOL, errstr);
conf_->set("sasl.mechanisms", MECHANISMS, errstr);
std::string kinit = "/usr/bin/kinit -kt \"";
kinit.append(KEYTAB_FILE);
kinit.append("\" ");
kinit.append(PRINCIPAL);
conf_->set("sasl.kerberos.kinit.cmd", kinit, errstr);
conf_->set("sasl.kerberos.keytab", KEYTAB_FILE, errstr);
conf_->set("sasl.kerberos.service.name", SERVICE_NAME, errstr);
conf_->set("sasl.kerberos.principal", PRINCIPAL, errstr);
conf_->set("sasl.kerberos.domain.name", DOMAIN_NAME, errstr);
printf("kafka sasl PROTOCOL=%s,MECHANISMS=%s,kinit=%s,KEYTAB_FILE=%s,SERVICE_NAME=%s,PRINCIPAL=%s,DOMAIN_NAME=%s \n", PROTOCOL, MECHANISMS, kinit.c_str(), KEYTAB_FILE, SERVICE_NAME, PRINCIPAL, DOMAIN_NAME);
}
if (async)
{
char size_str[256];
conf_->set("producer.type", "async", errstr);
//conf_->set("queue.buffering.max.messages", std::to_string((long long)size).c_str(), errstr);
memset(size_str,0,256);
apr_snprintf(size_str,sizeof(size_str),"%i",size);
conf_->set("queue.buffering.max.messages", size_str, errstr);
{
//const char* api_version_request = "false";
//const char* api_version_fallback = "0.8.2.0";
//conf_->set("api.version.request", api_version_request, errstr) ;
//conf_->set("broker.version.fallback", api_version_fallback, errstr);
}
}
else
{
conf_->set("producer.type", "sync", errstr);
}
#ifdef __GNUC__
producer_ = RdKafka::Producer::create(conf_, errstr);
if (!producer_)
{
std::cerr << "Failed to create producer: " << errstr << std::endl;
return false;
}
#endif
return true;
}
int FeKafkaProducer::send(const char *data, const int &size, const std::string &topic,
const int &partition, const std::string *key,const int &timeout)
{
RdKafka::Topic *tpk = get_topic(topic);
//std::cout<<"send data "<<data<<std::endl;
if (tpk == nullptr) {
printf("FIRST: get topic(%s) failed, to create at once \n",topic.c_str());
bool ret = create_topic(topic);
if(ret) {
printf("create topic OK \n");
tpk = get_topic(topic);
if (tpk == nullptr) {
printf("SECOND: get topic(%s) failed! but create topic seemed OK, what ??? ",topic.c_str());
return -1;
}
}
else {
printf("create topic Failed \n");
return -1;
}
}
RdKafka::ErrorCode resp =
producer_->produce(tpk, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char*>(data), size,
key->c_str(),key->size(), NULL);
///*NULL*/ key, NULL);
if (resp != RdKafka::ERR_NO_ERROR)
{
#ifdef __GNUC__
std::cerr << "% Produce failed: " <<
RdKafka::err2str(resp) << std::endl;
#endif
return -1;
}
producer_->poll(timeout);
return size;
}
int FeKafkaProducer::send_batch(const std::vector<std::pair<const char *, const int &> > &data,
const std::string &topic, const int &partition, const int &timeout)
{
return 0;
}
bool FeKafkaProducer::create_topic(const std::string &topic)
{
std::string errstr;
if (topic.empty() || producer_== nullptr || tconf_== nullptr)
{
printf("(topic.empty() || producer_== nullptr || tconf_== nullptr) \n");
return false;
}
if (this->get_topic(topic) != nullptr)
{
printf("if (this->get_topic(topic) != nullptr) \n");
return false;
}
#ifdef __GNUC__
RdKafka::Topic* tpk = (RdKafka::Topic::create(producer_, topic, tconf_, errstr));
if (!tpk)
{
std::cerr << "Failed to create topic: " << errstr << std::endl;
return false;
}
topics_.insert(make_pair(topic, tpk));
#endif
return true;
}
RdKafka::Topic *FeKafkaProducer::get_topic(const std::string &topic)
{
std::map<std::string, RdKafka::Topic*>::iterator it = topics_.find(topic);
if(it != topics_.end())
{
return it->second;
}
return nullptr;
}
void FeKafkaProducer::read_config(const char *path)
{
}
void FeKafkaProducer::close()
{
while (producer_->outq_len() > 0)
{
std::cerr << "Waiting for " << producer_->outq_len() << std::endl;
producer_->poll(1000);
}
#ifdef __GNUC__
RdKafka::wait_destroyed(1000);
#endif
}