diff --git a/cfg_parse/SimpleProducer.cpp b/cfg_parse/SimpleProducer.cpp index 2650b9c..5a29d33 100644 --- a/cfg_parse/SimpleProducer.cpp +++ b/cfg_parse/SimpleProducer.cpp @@ -29,10 +29,14 @@ #include "../rocketmq/DefaultMQPushConsumer.h" #include "../rocketmq/ConsumeType.h" +#include "../rocketmq/MQMessageListener.h" +#include "../rocketmq/MQMessageExt.h" +#include "../rocketmq/SessionCredentials.h" + // 引入提供的消费者接口头文件 -#include "../rocketmq/CPushConsumer.h" -#include "../rocketmq/CCommon.h" -#include "../rocketmq/CMessageExt.h" +//#include "../rocketmq/CPushConsumer.h" +//#include "../rocketmq/CCommon.h" +//#include "../rocketmq/CMessageExt.h" #include #include // 用于互斥锁(在 C++98 中没有 std::mutex) #include // for std::pair @@ -79,20 +83,40 @@ extern std::string G_MQCONSUMER_CHANNEL; class RocketMQConsumer; // 全局映射:CPushConsumer* -> RocketMQConsumer* -std::map g_consumerMap;// -pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER; +//std::map g_consumerMap;// +//pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER; + +//////////////////////////////// +int64_t G_APP_START_MS = []() -> int64_t { + using namespace std::chrono; + return duration_cast( + system_clock::now().time_since_epoch() + ).count(); +}(); + +int64_t G_START_SKEW_MS = 1000; + +bool should_process_after_start(const rocketmq::MQMessageExt& msg) +{ + const int64_t born_ts = static_cast(msg.getBornTimestamp()); + return born_ts >= (G_APP_START_MS - G_START_SKEW_MS); +} +/////////////////////////////// +class InternalListener; class RocketMQConsumer { public: // 构造函数:初始化消费者并启动 - RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId); + RocketMQConsumer(const std::string& consumerName, const std::string& nameServer); // 禁用拷贝和赋值 - RocketMQConsumer(const RocketMQConsumer&) {} + //RocketMQConsumer(const RocketMQConsumer&) {} RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; } // 订阅主题和标签,并注册回调 - void subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback); + void subscribe(const std::string& topic, + const std::string& tag, + MessageCallBack callback); // 启动消费者 void start(); @@ -100,23 +124,47 @@ public: //修改消费模式 void setConsumerMessageModel(const std::string& topic); + rocketmq::ConsumeStatus handleMessage(const rocketmq::MQMessageExt& msg); + // 析构函数:关闭并销毁消费者 ~RocketMQConsumer(); private: - CPushConsumer* consumer_; // C 接口消费者指针 + //CPushConsumer* consumer_; // C 接口消费者指针 //MessageCallBack messageCallback_; // 函数指针用于回调 + rocketmq::DefaultMQPushConsumer consumer_; + InternalListener* listener_; + std::map, MessageCallBack> callbacks_; // 订阅到回调的映射 - // 静态消息处理回调 + /*// 静态消息处理回调 static int messageHandler(CPushConsumer* consumer, CMessageExt* msg); - // 实例消息处理函数 - int handleMessage(CMessageExt* msg); -}; + int handleMessage(CMessageExt* msg);*/ -// 构造函数实现 -RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId) +}; +class InternalListener : public rocketmq::MessageListenerConcurrently { +public: + explicit InternalListener(RocketMQConsumer* owner) + : owner_(owner) {} + + rocketmq::ConsumeStatus consumeMessage( + const std::vector& msgs) override + { + for (size_t i = 0; i < msgs.size(); ++i) { + rocketmq::ConsumeStatus ret = owner_->handleMessage(msgs[i]); + if (ret != rocketmq::CONSUME_SUCCESS) { + return ret; + } + } + return rocketmq::CONSUME_SUCCESS; + } + +private: + RocketMQConsumer* owner_; +}; +// 构造函数实现C +/*RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId) : consumer_(NULL)//, messageCallback_(NULL) { // 创建消费者 @@ -163,12 +211,29 @@ RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::s pthread_mutex_unlock(&g_consumerMapMutex); std::cout << "RocketMQ Consumer initialized and started." << std::endl; +}*/ + +//构造函数实现C++ +RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup, + const std::string& nameServer) + : consumer_(consumerGroup), + listener_(NULL) +{ + consumer_.setNamesrvAddr(nameServer); + + consumer_.setSessionCredentials( + G_MQCONSUMER_ACCESSKEY, + G_MQCONSUMER_SECRETKEY, + G_MQCONSUMER_CHANNEL + ); + + listener_ = new InternalListener(this); } // 启动消费者 void RocketMQConsumer::start() { - if (StartPushConsumer(consumer_) != 0) { + /*if (StartPushConsumer(consumer_) != 0) { pthread_mutex_lock(&g_consumerMapMutex); g_consumerMap.erase(consumer_); pthread_mutex_unlock(&g_consumerMapMutex); @@ -177,21 +242,26 @@ void RocketMQConsumer::start() } else{ std::cout << "RocketMQ Consumer started." << std::endl; - } + }*/ + consumer_.registerMessageListener(listener_); + consumer_.start(); } void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback) { - if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) { + /*if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) { throw std::runtime_error("Failed to subscribe to topic/tag."); - } + }*/ + consumer_.subscribe(topic, tag); + + //调试用 std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl; // 使用 std::pair 作为键 - std::pair key(topic, tag); - callbacks_[key] = callback; + std::pair mapKey(topic, tag); + callbacks_[mapKey] = callback; } - +/* // 静态消息处理回调实现 int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg) { @@ -212,68 +282,66 @@ int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg) return instance->handleMessage(msg); } else { std::cerr << "Consumer instance not found for callback." << std::endl; - return E_RECONSUME_LATER; // 默认返回重试状态 + //return E_RECONSUME_LATER; // 默认返回重试状态 + return rocketmq::RECONSUME_LATER; } } - -int RocketMQConsumer::handleMessage(CMessageExt* msg) +*/ +/*int RocketMQConsumer::handleMessage(CMessageExt* msg) { // 检查 msg 和 consumer_ 是否为 NULL if (!msg || !consumer_) { std::cerr << "Received null message or consumer." << std::endl; - return E_RECONSUME_LATER; - } + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; + }*/ // 获取消息的主题和标签 - std::string topic = GetMessageTopic(msg); // 假设存在此函数 - std::string tag = GetMessageTags(msg); // 假设存在此函数 + //std::string topic = GetMessageTopic(msg); // 假设存在此函数 + //std::string tag = GetMessageTags(msg); // 假设存在此函数 - // 打印调试信息 - std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl; - - // 使用 std::pair 作为键 - std::pair key(topic, tag); - - // 查找对应的回调函数 - std::map, MessageCallBack>::iterator it = callbacks_.find(key); - if (it != callbacks_.end()) { - // 调用对应的回调函数 - - //调试 - std::cout << "callback Handling message " <second(consumer_, msg); - - } else { - - //调试 - std::cout << "there is no callback " < key(topic, tag); + // 查找对应的回调函数 + std::map, MessageCallBack>::iterator it = callbacks_.find(key); + if (it != callbacks_.end()) + { // 调用对应的回调函数 + //调试 + std::cout << "callback Handling message " <second(consumer_, msg); + return it->second(msg); + } + else { + //调试 + std::cout << "there is no callback " <setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式 - g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag, subscriptions[i].callback); + g_consumer->subscribe(subscriptions[i].topic, subscriptions[i].tag,subscriptions[i].callback); } g_consumer->start(); @@ -407,24 +487,36 @@ int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) { class RocketMQProducer { public: RocketMQProducer(const std::string& producerName, const std::string& nameServer) - : producer_(NULL) + : producer_(producerName) { // 创建生产者 - producer_ = CreateProducer(producerName.c_str()); + /*producer_ = CreateProducer(producerName.c_str()); if (producer_ == NULL) { throw std::runtime_error("Failed to create producer."); - } + }*/ + + // 设置日志 + producer_.setLogLevel(rocketmq::eLOG_LEVEL_ERROR); + producer_.setLogFileSizeAndNum(5, 50); // 设置 nameserver 地址 - SetProducerNameServerAddress(producer_, nameServer.c_str()); + //SetProducerNameServerAddress(producer_, nameServer.c_str()); + producer_.setNamesrvAddr(nameServer); //lnk20260417设置数据上送消息体最大值,默认4M,调整为1M,避免过大消息导致发送失败 - SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB + //SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB + producer_.setMaxMessageSize(1024 * 1024); - SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), ""); + //SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), ""); + producer_.setSessionCredentials( + G_MQCONSUMER_ACCESSKEY, + G_MQCONSUMER_SECRETKEY, + "" + ); // 启动生产者 - StartProducer(producer_); + //StartProducer(producer_); + producer_.start(); std::cout << "rocketmq_Producer initialized and started." << std::endl; } @@ -433,7 +525,7 @@ public: RocketMQProducer(const RocketMQProducer&) = delete; RocketMQProducer& operator=(const RocketMQProducer&) = delete; - void printSendResult(const CSendResult& result) { + /*void printSendResult(const CSendResult& result) { std::cout << "SendResult:" << std::endl; std::cout << " Status: "; switch (result.sendStatus) { @@ -457,10 +549,10 @@ public: std::cout << " MsgID : " << result.msgId << std::endl; std::cout << " Offset: " << result.offset << std::endl; - } + }*/ // 发送消息 - void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) { +/* void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) { if (DEBUGOPEN) { std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL") @@ -585,7 +677,7 @@ public: << std::endl;*/ // 发送消息:临时改成同步发送,绕过 orderly / selector,便于定位问题 - if (sendResult == 0) { // 假设返回 0 表示成功 + /*if (sendResult == 0) { // 假设返回 0 表示成功 std::cout << "[MQ][SEND_OK]" << " topic=" << (topic ? topic : "") << ", tags=" << tags @@ -630,20 +722,97 @@ public: DestroyMessage(msg); } } + }*/ + void sendMessage(const std::string& body, + const std::string& topic, + const std::string& tags, + const std::string& keys) + { + try { + if (DEBUGOPEN) { + std::cout << "sendMessage called with topic: " << topic + << ", tags: " << tags + << ", keys: " << keys + << ", body_len=" << body.size() + << std::endl; + + size_t n = std::min((size_t)200, body.size()); + + std::cout << "[MQ][BODY_HEAD] " + << body.substr(0, n) + << std::endl; + + if (body.size() > n) { + std::cout << "[MQ][BODY_TAIL] " + << body.substr(body.size() - n, n) + << std::endl; + } + + std::cout << "[MQ][HEX_HEAD] "; + for (size_t i = 0; i < std::min((size_t)100, body.size()); ++i) { + printf("%02X ", (unsigned char)body[i]); + } + printf("\n"); + } + + rocketmq::MQMessage msg(topic, tags, keys, body); + + rocketmq::SendResult result = producer_.send(msg); + + std::cout << "[MQ][SEND_OK]" + << " topic=" << topic + << ", tags=" << tags + << ", keys=" << keys + << ", msgId=" << result.getMsgId() + << ", status=" << result.getSendStatus() + << ", body_len=" << body.size() + << std::endl; + } + catch (const rocketmq::MQClientException& e) { + std::cerr << "[MQ][SEND_FAIL] MQClientException: " + << e.what() << std::endl; + + DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, + "【ERROR】前置的%s%d号进程 mq发送失败,mq客户端错误,请检查mq配置", + get_front_msg_from_subdir(), g_front_seg_index); + } + catch (const std::exception& e) { + std::cerr << "[MQ][SEND_FAIL] exception: " + << e.what() << std::endl; + + DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, + "【ERROR】前置的%s%d号进程 mq发送失败,mq发送错误,发送请检查mq配置", + get_front_msg_from_subdir(), g_front_seg_index); + } + catch (...) { + std::cerr << "[MQ][SEND_FAIL] unknown exception" << std::endl; + + DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, + "【ERROR】前置的%s%d号进程 mq发送失败,未知错误,请检查mq配置", + get_front_msg_from_subdir(), g_front_seg_index); + } } // 析构函数中关闭并销毁生产者 ~RocketMQProducer() { - if (producer_) { + /*if (producer_) { ShutdownProducer(producer_); DestroyProducer(producer_); std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl; + }*/ + try { + producer_.shutdown(); } + catch (...) { + } + + std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl; } private: - CProducer* producer_; + //CProducer* producer_; + rocketmq::DefaultMQProducer producer_; }; // 全局生产者实例 @@ -654,7 +823,7 @@ void InitializeProducer() { if (g_producer == NULL) { try { - g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT); + g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);//生产者名称和NameServer地址 } catch (const std::exception& e) { std::cerr << "Failed to initialize producer: " << e.what() << std::endl; @@ -674,34 +843,33 @@ void ShutdownAndDestroyProducer() } // 发送消息的接口函数 -void rocketmq_producer_send(const char* strbody, const char* topic) +void rocketmq_producer_send(const std::string& body, + const std::string& topic, + const std::string& tags, + const std::string& keys) { if (g_producer == NULL) { try { InitializeProducer(); - } - catch (...) { + } catch (...) { std::cerr << "Cannot send message because producer initialization failed." << std::endl; return; } } - // 假设 tags 和 keys 是固定的,可以根据需要修改 - std::string tags = G_ROCKETMQ_TAG; - std::string keys = G_ROCKETMQ_KEY; - try { - g_producer->sendMessage(strbody, topic, tags, keys); - } - catch (const std::exception& e) { + g_producer->sendMessage(body, topic, tags, keys); + } catch (const std::exception& e) { std::cerr << "Failed to send message: " << e.what() << std::endl; - // 处理发送失败的情况,例如记录日志或重试 - DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, + "【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", + get_front_msg_from_subdir(), g_front_seg_index); } } #endif ////////////////////////////////////////////////////////////////////////////////////////////////////////// +/* // producer_send0测试用 void StartSendMessage(CProducer* producer) { @@ -786,7 +954,7 @@ void producer_send(const char* strbody) DestroyProducer(producer); cout << "Producer Shutdown!" << endl; } - +*/ /////////////////////////////////////////////////////////////////////////////////////////////////////////// extern "C" { diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 71cf023..1cad11d 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -7167,6 +7167,7 @@ void send_reply_to_kafka(const std::string& guid, const std::string& step, const // 封装 Kafka 消息 Ckafka_data_t connect_info; connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic); + connect_info.mp_id = QString::fromStdString(guid);//guid作为key connect_info.strText = QString::fromStdString(jsonString); // 加入发送队列(带互斥锁保护) @@ -7195,6 +7196,7 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step // 封装 Kafka 消息 Ckafka_data_t connect_info; connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic); + connect_info.mp_id = QString::fromStdString(guid);//guid作为key connect_info.strText = QString::fromStdString(jsonString); // 加入发送队列(带互斥锁保护) @@ -7204,20 +7206,25 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step } void send_heartbeat_to_kafka(const std::string& status) { + + std::string front_type = get_front_type_from_subdir(); // 构造 JSON 字符串 std::ostringstream oss; oss << "{" << "\"nodeId\":\"" << FRONT_INST << "\"," - << "\"frontType\":\"" << get_front_type_from_subdir() << "\"," + << "\"frontType\":\"" << front_type << "\"," << "\"processNo\":\"" << g_front_seg_index << "\"," << "\"status\":\"" << status << "\"" << "}"; std::string jsonString = oss.str(); + std::string mpid_str = std::to_string(g_node_id) + "_" + std::to_string(g_front_seg_index); + // 封装 Kafka 消息 Ckafka_data_t connect_info; connect_info.strTopic = QString::fromStdString(Heart_Beat_Topic); + connect_info.mp_id = QString::fromStdString(mpid_str); connect_info.strText = QString::fromStdString(jsonString); // 加入发送队列(带互斥锁保护) diff --git a/cfg_parse/log4.cpp b/cfg_parse/log4.cpp index a6302f4..d68f05c 100644 --- a/cfg_parse/log4.cpp +++ b/cfg_parse/log4.cpp @@ -360,21 +360,25 @@ protected: final_msg = suppressed_oss.str(); } + std::string business_id = extract_logger_id(logger_name); + std::string front_type = get_front_type_from_subdir(); + std::ostringstream oss; oss << "{\"processNo\":\"" << intToString(g_front_seg_index) - << "\",\"nodeId\":\"" << FRONT_INST - << "\",\"businessId\":\"" << extract_logger_id(logger_name) - << "\",\"level\":\"" << level_str - << "\",\"grade\":\"" << get_level_str(level) + << "\",\"nodeId\":\"" << escape_json(FRONT_INST) + << "\",\"businessId\":\"" << escape_json(business_id) + << "\",\"level\":\"" << escape_json(level_str) + << "\",\"grade\":\"" << escape_json(get_level_str(level)) << "\",\"logtype\":\"" << safe_logtype - << "\",\"frontType\":\"" << get_front_type_from_subdir() + << "\",\"frontType\":\"" << escape_json(front_type) << "\",\"code\":" << code << ",\"log\":\"" << escape_json(final_msg) << "\"}"; - + Ckafka_data_t connect_info; connect_info.strTopic = QString::fromStdString(G_LOG_TOPIC); - connect_info.strText = QString::fromStdString(oss.str()); - + connect_info.mp_id = QString::fromStdString(business_id); + connect_info.strText = QString::fromStdString(oss.str()); + kafka_data_list_mutex.lock(); kafka_data_list.append(connect_info); kafka_data_list_mutex.unlock(); diff --git a/include/mmslite/sysincs.h b/include/mmslite/sysincs.h index f24711f..538a233 100644 --- a/include/mmslite/sysincs.h +++ b/include/mmslite/sysincs.h @@ -224,8 +224,19 @@ extern "C" { #include #include #endif +/* #define max(a,b) (((a) > (b)) ? (a) : (b)) #define min(a,b) (((a) < (b)) ? (a) : (b)) +*/ +#ifndef __cplusplus +#ifndef max +#define max(a,b) (((a) > (b)) ? (a) : (b)) +#endif + +#ifndef min +#define min(a,b) (((a) < (b)) ? (a) : (b)) +#endif +#endif #include #include #include diff --git a/include/rocketmq/SimpleProducer.h b/include/rocketmq/SimpleProducer.h index 3b49b69..f76b95c 100644 --- a/include/rocketmq/SimpleProducer.h +++ b/include/rocketmq/SimpleProducer.h @@ -1,21 +1,34 @@ #ifdef __cplusplus #include "../json/mms_json_inter.h" -#include "../rocketmq/CProducer.h" -#include "../rocketmq/CMessage.h" -#include "../rocketmq/CSendResult.h" - -#include "../rocketmq/CPushConsumer.h" +//#include "../rocketmq/CProducer.h" +//#include "../rocketmq/CMessage.h" +//#include "../rocketmq/CSendResult.h" +//#include "../rocketmq/CPushConsumer.h" +#include "../rocketmq/DefaultMQProducer.h" +#include "../rocketmq/MQMessage.h" +#include "../rocketmq/SendResult.h" +#include "../rocketmq/SessionCredentials.h" +#include "../rocketmq/MQMessageExt.h" +#include "../rocketmq/ConsumeType.h" +#include "../rocketmq/MQMessageListener.h" #include +#include +#include +using namespace rocketmq; /*添加测试函数lnk10-10*/ -void producer_send0(); -void StartSendMessage(CProducer* producer,const char* strbody); -void producer_send(const char* strbody); -void rocketmq_producer_send(const char* strbody,const char* topic); -void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic); +//void producer_send0(); +//void StartSendMessage(CProducer* producer,const char* strbody); +//void producer_send(const char* strbody); +//void rocketmq_producer_send(const char* strbody,const char* topic); +//void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic); +void rocketmq_producer_send(const std::string& body, + const std::string& topic, + const std::string& tags, + const std::string& keys); extern "C" { void rocketmq_test_rt(); void rocketmq_test_ud(); @@ -32,24 +45,29 @@ extern void my_rocketmq_send(Ckafka_data_t& data); void InitializeProducer(); void ShutdownAndDestroyProducer(); //////////////////////////////////////////////////////消费者 -void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key); -void ShutdownAndDestroyConsumer(); +typedef ConsumeStatus (*MessageCallBack)( + const MQMessageExt& msg +); struct Subscription { std::string topic; std::string tag; MessageCallBack callback; - - Subscription(const std::string& t, const std::string& tg, MessageCallBack cb) + + Subscription(const std::string& t, + const std::string& tg, + MessageCallBack cb) : topic(t), tag(tg), callback(cb) {} }; +//void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key); +void InitializeConsumer(const std::string& consumerName, + const std::string& nameServer, + const std::vector& subscriptions); +void ShutdownAndDestroyConsumer(); void rocketmq_consumer_receive( const std::string& consumerName, const std::string& nameServer, - //const std::string& topic, - //const std::string& tag, - //MessageCallBack callback); const std::vector& subscriptions); ////////////////////////////////////////////////////// diff --git a/include/sas_system.h b/include/sas_system.h index 8915b43..3707895 100644 --- a/include/sas_system.h +++ b/include/sas_system.h @@ -49,7 +49,7 @@ #ifndef HIBYTE #define HIBYTE(w) ((byte_t)((uint16_t)(w) >> 8)) #endif - +/* #ifndef max #define max(a,b) (((a) > (b)) ? (a) : (b)) #endif @@ -57,7 +57,18 @@ #ifndef min #define min(a,b) (((a) < (b)) ? (a) : (b)) #endif +*/ +#ifndef __cplusplus +#ifndef max +#define max(a,b) (((a) > (b)) ? (a) : (b)) +#endif + +#ifndef min +#define min(a,b) (((a) < (b)) ? (a) : (b)) +#endif + +#endif #ifdef __cplusplus extern "C" { #endif diff --git a/json/create_json.cpp b/json/create_json.cpp index 4faeffd..87eb850 100644 --- a/json/create_json.cpp +++ b/json/create_json.cpp @@ -45,6 +45,8 @@ extern std::string WEB_EVENT; extern std::string WEB_FILEDOWNLOAD; extern std::string G_CONNECT_TOPIC; +extern int RECALL_ONLY_FLAG; + //lnk20250115添加台账锁 extern pthread_mutex_t mtx; @@ -3271,14 +3273,13 @@ void connectlog_pgsql(char* id,char* datetime,int status) return; } - - //使用mq Ckafka_data_t connect_info; connect_info.strTopic = QString::fromStdString(G_CONNECT_TOPIC); + connect_info.mp_id = QString::fromLocal8Bit(id);//这里填装置id,后续作为key connect_info.strText = QString::fromStdString(std::string(jsonString)); - if(g_node_id == STAT_DATA_BASE_NODE_ID){//稳态才上传 + if((g_node_id == STAT_DATA_BASE_NODE_ID && RECALL_ONLY_FLAG == 0) || (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID && RECALL_ONLY_FLAG == 1)){//稳态或者补招才上传 kafka_data_list_mutex.lock(); //加锁 kafka_data_list.append(connect_info); //添加 kafka发送链表 kafka_data_list_mutex.unlock(); //解锁 diff --git a/json/save2json.cpp b/json/save2json.cpp index 6b9d1e1..5946f6f 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -34,7 +34,8 @@ using namespace std; #include "../json/save2json.h" #include "../json/mms_json_inter.h" #include "kafka_producer.h" -#include "../rocketmq/CPushConsumer.h" +//#include "../rocketmq/CPushConsumer.h" +#include "../rocketmq/DefaultMQPushConsumer.h" #include #include "../json/cjson.h" //解json #include //创建xml @@ -45,6 +46,9 @@ extern std::string intToString(int number); int StringToInt(const std::string& str); extern pthread_mutex_t mtx;//lnk20250115 +extern bool should_process_after_start(const rocketmq::MQMessageExt& msg); +extern int64_t G_APP_START_MS; + extern int RECALL_ONLY_FLAG; //lnk20260309添加一个全局变量,标志是否只运行补招程序 extern void SendFileWeb(const std::string& strUrl, @@ -156,6 +160,8 @@ extern std::string Topic_Reply_Key; extern std::string WEB_FILEUPLOAD; extern std::string WEB_FILEDOWNLOAD; +extern std::string G_ROCKETMQ_CONSUMER; + bool showinshellflag =false; @@ -363,7 +369,7 @@ void my_rocketmq_send(Ckafka_data_t& data) } //rocketmq_producer_send(const_cast(senddata.c_str()),const_cast(topic.c_str())); - rocketmq_producer_send(senddata.c_str(), topic.c_str());//lnk20250623修复偶发性doublefree + rocketmq_producer_send(senddata, topic,FRONT_INST,key);//lnk20250623修复偶发性doublefree } #if 0 @@ -722,7 +728,7 @@ bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::st //回复消息 //执行结果直接看实时数据,不需要再回复,1是收到消息 - send_reply_to_kafka(guid,"1","收到三秒数据指令"); + send_reply_to_kafka("11111","1","收到三秒数据指令");//实时数据没有下发guid } else { std::cerr << "Missing expected fields in JSON message." << std::endl; @@ -1471,14 +1477,14 @@ int parse_log(const std::string& json_str) { return 0; } - DIY_INFOLOG_CODE("process",0,LOG_CODE_LOG_REQUEST,"【NORMAL】前置的%s%d号进程处理日志上送消息",get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG_CODE("process",0,LOG_CODE_LOG_REQUEST,"【NORMAL】前置的%s%d号进程处理日志控制消息",get_front_msg_from_subdir(), g_front_seg_index); //进程号和匹配上 std::cout << "msg index:"<< processNo <<" self index:" << g_front_seg_index << std::endl; std::cout << "msg frontType:"<< frontType <<" self frontType:" << subdir << std::endl; //回复消息 - send_reply_to_kafka(guid,"1","收到实时日志指令"); + send_reply_to_kafka(guid,"1","收到日志控制指令"); if (code_str == "set_log") { //校验数据 @@ -1492,7 +1498,7 @@ int parse_log(const std::string& json_str) { } else if((level == "measurepoint") && (grade == "TRACE") && (!id.empty() && !is_blank(id))){ //数据追踪 //打开监测点数据追踪开关 - process_trace_command(id,5); //5表示追踪次数 + process_trace_command(id,3); //3表示追踪次数 } else{ std::cout << "type doesnt match" <guid); dir_info.strText = QString::fromStdString(jsonString); kafka_data_list_mutex.lock(); @@ -2373,21 +2380,45 @@ char* find_mp_name_from_mp_id(const char* mp_id) } -int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg) +//int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg) +ConsumeStatus myMessageCallbackrtdata( + const rocketmq::MQMessageExt& msg) { if(INITFLAG != 1)return 1;//防止崩溃 - if (msg == NULL) { + if (!should_process_after_start(msg)) { + std::cout << "[MQ] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + + return rocketmq::CONSUME_SUCCESS; + } + + /*if (msg == NULL) { std::cerr << "Received null message." << std::endl; - return E_RECONSUME_LATER; - } + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; + }*/ + if (msg.getBody().empty()) { + std::cout << "empty msg body" << std::endl; + } - const char* body = GetMessageBody(msg); - const char* key = GetMessageKeys(msg); + //const char* body = GetMessageBody(msg); + //const char* key = GetMessageKeys(msg); + std::string body = msg.getBody(); + std::string key = msg.getKeys(); + std::string tag = msg.getTags(); + std::string topic = msg.getTopic(); - if (body == NULL) { + //if (body == NULL) { + if (body.empty()) { std::cerr << "Message body is NULL." << std::endl; - return E_RECONSUME_LATER; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } else{ //记录日志 @@ -2395,7 +2426,8 @@ int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg) // 处理消息(例如,打印消息内容) std::cout << "rt data Callback received message: " << body << std::endl; - if (key) { + //if (key) { + if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { @@ -2411,7 +2443,8 @@ int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg) std::cerr << "Failed to parse the JSON message." << std::endl; //记录日志 DIY_ERRORLOG_CODE("process",0,LOG_CODE_RT_DATA,"【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确",FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RT.c_str()); - return E_RECONSUME_LATER; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } //mq处理实时数据指令查询台账时添加锁 @@ -2424,42 +2457,70 @@ int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg) if(dev_index == 0 || mp_index == 0){ std::cerr << "dev index or mp index is 0" << std::endl; - return E_RECONSUME_LATER; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } // 创建 XML 文件 if (!createXmlFile(dev_index, mp_index, realData, soeData, limit,"new")) { DIY_ERRORLOG_CODE("process",0,LOG_CODE_RT_DATA,"【ERROR】前置无法创建实时数据触发文件"); std::cerr << "Failed to create the XML file." << std::endl; - return E_RECONSUME_LATER; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } } // 根据业务逻辑决定返回状态 - return E_CONSUME_SUCCESS; + //return E_CONSUME_SUCCESS; + return rocketmq::CONSUME_SUCCESS; } -int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg) +//int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg) +ConsumeStatus myMessageCallbackupdate( + const rocketmq::MQMessageExt& msg) { if(INITFLAG != 1)return 1;//防止崩溃 - if (msg == NULL) { + if (!should_process_after_start(msg)) { + std::cout << "[MQ] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + + return rocketmq::CONSUME_SUCCESS; + } + + /*if (msg == NULL) { std::cerr << "Received null message." << std::endl; - return E_RECONSUME_LATER; - } + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; + }*/ + if (msg.getBody().empty()) { + std::cout << "empty msg body" << std::endl; + } - const char* body = GetMessageBody(msg); - const char* key = GetMessageKeys(msg); + //const char* body = GetMessageBody(msg); + //const char* key = GetMessageKeys(msg); + std::string body = msg.getBody(); + std::string key = msg.getKeys(); + std::string tag = msg.getTags(); + std::string topic = msg.getTopic(); - if (body == NULL) { - std::cerr << "Message body is NULL." << std::endl; - return E_RECONSUME_LATER; + //if (body == NULL) { + if (body.empty()) { + std::cerr << "Message body is empty." << std::endl; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } else{ //处理消费数据 std::cout << "ledger update Callback received message: " << body << std::endl; - if (key) { + //if (key) { + if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { @@ -2481,28 +2542,55 @@ int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg) } // 根据业务逻辑决定返回状态 - return E_CONSUME_SUCCESS; + //return E_CONSUME_SUCCESS; + return rocketmq::CONSUME_SUCCESS; } -int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg) +//int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg) +ConsumeStatus myMessageCallbackset( + const rocketmq::MQMessageExt& msg) { if(INITFLAG != 1)return 1;//防止崩溃 - if (msg == NULL) { + + if (!should_process_after_start(msg)) { + std::cout << "[MQ] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + + return rocketmq::CONSUME_SUCCESS; + } + + /*if (msg == NULL) { std::cerr << "Received null message." << std::endl; - return E_RECONSUME_LATER; - } + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; + }*/ + if (msg.getBody().empty()) { + std::cout << "empty msg body" << std::endl; + } - const char* body = GetMessageBody(msg); - const char* key = GetMessageKeys(msg); + //const char* body = GetMessageBody(msg); + //const char* key = GetMessageKeys(msg); + std::string body = msg.getBody(); + std::string key = msg.getKeys(); + std::string tag = msg.getTags(); + std::string topic = msg.getTopic(); - if (body == NULL) { + //if (body == NULL) { + if (body.empty()) { std::cerr << "Message body is NULL." << std::endl; - return E_RECONSUME_LATER; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } else{ //处理消费数据 std::cout << "process Callback received message: " << body << std::endl; - if (key) { + //if (key) { + if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { @@ -2517,28 +2605,55 @@ int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg) } // 根据业务逻辑决定返回状态 - return E_CONSUME_SUCCESS; + //return E_CONSUME_SUCCESS; + return rocketmq::CONSUME_SUCCESS; } -int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg) +//int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg) +ConsumeStatus myMessageCallbacklog( + const rocketmq::MQMessageExt& msg) { if(INITFLAG != 1)return 1;//防止崩溃 - if (msg == NULL) { + + if (!should_process_after_start(msg)) { + std::cout << "[MQ] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + + return rocketmq::CONSUME_SUCCESS; + } + + /*if (msg == NULL) { std::cerr << "Received null message." << std::endl; - return E_RECONSUME_LATER; - } + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; + }*/ + if (msg.getBody().empty()) { + std::cout << "empty msg body" << std::endl; + } - const char* body = GetMessageBody(msg); - const char* key = GetMessageKeys(msg); + //const char* body = GetMessageBody(msg); + //const char* key = GetMessageKeys(msg); + std::string body = msg.getBody(); + std::string key = msg.getKeys(); + std::string tag = msg.getTags(); + std::string topic = msg.getTopic(); - if (body == NULL) { + //if (body == NULL) { + if (body.empty()) { std::cerr << "Message body is NULL." << std::endl; - return E_RECONSUME_LATER; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } else{ //处理消费数据 std::cout << "process Callback received message: " << body << std::endl; - if (key) { + //if (key) { + if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { @@ -2553,38 +2668,65 @@ int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg) } // 根据业务逻辑决定返回状态 - return E_CONSUME_SUCCESS; + //return E_CONSUME_SUCCESS; + return rocketmq::CONSUME_SUCCESS; } -int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg) +//int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg) +ConsumeStatus myMessageCallbackrecall( + const rocketmq::MQMessageExt& msg) { if(INITFLAG != 1)return 1;//防止崩溃 + + if (!should_process_after_start(msg)) { + std::cout << "[MQ] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + + return rocketmq::CONSUME_SUCCESS; + } + //调试 std::cout << "myMessageCallbackrecall"<< std::endl; - if (msg == NULL) { + /*if (msg == NULL) { std::cerr << "Received null message." << std::endl; - return E_RECONSUME_LATER; - } + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; + }*/ + if (msg.getBody().empty()) { + std::cout << "empty msg body" << std::endl; + } - const char* body = GetMessageBody(msg); - const char* key = GetMessageKeys(msg); + //const char* body = GetMessageBody(msg); + //const char* key = GetMessageKeys(msg); + std::string body = msg.getBody(); + std::string key = msg.getKeys(); + std::string tag = msg.getTags(); + std::string topic = msg.getTopic(); - if (body == NULL) { + //if (body == NULL) { + if (body.empty()) { std::cerr << "Message body is NULL." << std::endl; - return E_RECONSUME_LATER; + //return E_RECONSUME_LATER; + return rocketmq::RECONSUME_LATER; } else{ // 处理消息(例如,打印消息内容) std::cout << "recall Callback received message: " << body << std::endl; - if (key) { + //if (key) { + if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } //处理消费数据 - std::string result = extractDataJson(body); // 使用 std::string 代替 malloc + std::string result = extractDataJson(body.c_str()); // 使用 std::string 代替 malloc //调试 std::cout << "extractDataJson:"<< result.c_str() < #include +#include + +using namespace rocketmq; /*添加测试函数lnk10-10*/ -void producer_send0(); -void StartSendMessage(CProducer* producer,const char* strbody); -void producer_send(const char* strbody); -void rocketmq_producer_send(const char* strbody,const char* topic); -void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic); +//void producer_send0(); +//void StartSendMessage(CProducer* producer,const char* strbody); +//void producer_send(const char* strbody); +//void rocketmq_producer_send(const char* strbody,const char* topic); +//void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic); +void rocketmq_producer_send(const std::string& body, + const std::string& topic, + const std::string& tags, + const std::string& keys); extern "C" { void rocketmq_test_rt(); void rocketmq_test_ud(); @@ -32,17 +45,25 @@ extern void my_rocketmq_send(Ckafka_data_t& data); void InitializeProducer(); void ShutdownAndDestroyProducer(); //////////////////////////////////////////////////////消费者 -void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key); -void ShutdownAndDestroyConsumer(); +typedef ConsumeStatus (*MessageCallBack)( + const MQMessageExt& msg +); struct Subscription { std::string topic; std::string tag; MessageCallBack callback; - - Subscription(const std::string& t, const std::string& tg, MessageCallBack cb) - : topic(t), tag(tg), callback(cb) {std::cout << "Subscription topic: " << topic << std::endl;} + + Subscription(const std::string& t, + const std::string& tg, + MessageCallBack cb) + : topic(t), tag(tg), callback(cb) {} }; +//void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key); +void InitializeConsumer(const std::string& consumerName, + const std::string& nameServer, + const std::vector& subscriptions); +void ShutdownAndDestroyConsumer(); void rocketmq_consumer_receive( const std::string& consumerName,