From f167d705a9a23b729a5a3da24a3a2e21625ad9d1 Mon Sep 17 00:00:00 2001 From: lnk Date: Wed, 26 Feb 2025 16:39:10 +0800 Subject: [PATCH] fix log realdata send funtion --- .vscode/settings.json | 3 +- cfg_parse/SimpleProducer.cpp | 15 ++++++ cfg_parse/cfg_parser.cpp | 44 ++++++++++++----- cfg_parse/custom_printf.h | 15 +++++- include/rocketmq/SimpleProducer.h | 1 + json/save2json.cpp | 80 +++++++++++++++++++++++-------- mms/db_interface.h | 2 +- mms/ver_conf.h | 1 + pt61850netd_pqfe.pro | 3 +- 9 files changed, 126 insertions(+), 38 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index d3be971..ee40160 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -136,6 +136,7 @@ "simpleproducer.h": "c", "stdbool.h": "c", "node.h": "c", - "save2json.h": "c" + "save2json.h": "c", + "custom_printf.h": "c" } } \ No newline at end of file diff --git a/cfg_parse/SimpleProducer.cpp b/cfg_parse/SimpleProducer.cpp index b9d89ba..9920c13 100644 --- a/cfg_parse/SimpleProducer.cpp +++ b/cfg_parse/SimpleProducer.cpp @@ -799,6 +799,21 @@ void rocketmq_test_rc() my_rocketmq_send(data); } +extern std::string G_MQCONSUMER_TOPIC_LOG; +void rocketmq_test_log() +{ + Ckafka_data_t data; + data.monitor_id = 123123; + data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG); + std::ifstream file("log_test.txt"); // 文件中存储长字符串 + std::stringstream buffer; + buffer << file.rdbuf(); // 读取整个文件内容 + + data.strText = QString::fromStdString(buffer.str()); + data.mp_id = 123123; + my_rocketmq_send(data); +} + } diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 6e74cce..0ff0540 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -1043,6 +1043,7 @@ void init_config() { MULTIPLE_NODE_FLAG = 0; std::cout << "ǰǵ:" << std::endl; } + //20250109lnkӽ̲Դӡ˿ if (g_node_id == STAT_DATA_BASE_NODE_ID)//ͳƲɼ TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index; @@ -1055,7 +1056,7 @@ void init_config() { else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//̬¼ TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index; } - + } // CZY ping IP @@ -3446,7 +3447,9 @@ int GetServerIndexFromDB() // if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0) { ifc.ifc_len = sizeof buf; ifc.ifc_buf = (caddr_t)buf; + if (!ioctl(fd, SIOCGIFCONF, (char*)&ifc)) { + interface = ifc.ifc_len / sizeof(struct ifreq); printf("\ninterface num is interface= %d\n", interface); @@ -15218,8 +15221,10 @@ void redirectWarnOutput(bool enabled) { if (enabled) { static RedirectStreamBuf warnBuf(warnList, warnListMutex); std::clog.rdbuf(&warnBuf); + std::cerr.rdbuf(&warnBuf); } else { std::clog.rdbuf(nullptr); // ָ׼澯 + std::cerr.rdbuf(nullptr); // ָ׼ } } @@ -15229,8 +15234,12 @@ void redirectNormalOutput(bool enabled) { if (enabled) { static RedirectStreamBuf normalBuf(normalList, normalListMutex); std::cout.rdbuf(&normalBuf); + std::clog.rdbuf(&normalBuf); + std::cerr.rdbuf(&normalBuf); } else { std::cout.rdbuf(nullptr); // ָ׼ + std::clog.rdbuf(nullptr); // ָ׼澯 + std::cerr.rdbuf(nullptr); // ָ׼ } } @@ -15240,35 +15249,44 @@ int customPrintf(const char* format, ...) { va_list args; va_start(args, format); char buffer[1024]; - vsnprintf(buffer, sizeof(buffer), format, args); + + // ȸʽַ buffer + int written = vsnprintf(buffer, sizeof(buffer), format, args); + + // ԭʼ va_list va_end(args); - // пضûʹԭ printf - if (!errorOutputEnabled && !warnOutputEnabled && !normalOutputEnabled) { - vprintf(format, args); - return 0; // ֵΪѴӡַ + // ʽʧܣش + if (written < 0) { + return -1; } // õĿؽӵӦб if (errorOutputEnabled) { - pthread_mutex_lock(&errorListMutex); // errorList + pthread_mutex_lock(&errorListMutex); errorList.push_back(buffer); - pthread_mutex_unlock(&errorListMutex); // errorList + pthread_mutex_unlock(&errorListMutex); } if (warnOutputEnabled) { - pthread_mutex_lock(&warnListMutex); // warnList + pthread_mutex_lock(&warnListMutex); warnList.push_back(buffer); - pthread_mutex_unlock(&warnListMutex); // warnList + pthread_mutex_unlock(&warnListMutex); } if (normalOutputEnabled) { - pthread_mutex_lock(&normalListMutex); // normalList + pthread_mutex_lock(&normalListMutex); normalList.push_back(buffer); - pthread_mutex_unlock(&normalListMutex); // normalList + pthread_mutex_unlock(&normalListMutex); } - return 0; // ֵΪѴӡַ + // пضûʹԭ printf + if (!errorOutputEnabled && !warnOutputEnabled && !normalOutputEnabled) { + // ֱն + std::cout << buffer << std::endl; // ʹ std::cout printf ijͻ + } + + return written; // Ѵӡַ } /////////////////////////////////////////////////////////////////////////////// diff --git a/cfg_parse/custom_printf.h b/cfg_parse/custom_printf.h index 26862e0..e58350c 100644 --- a/cfg_parse/custom_printf.h +++ b/cfg_parse/custom_printf.h @@ -4,9 +4,11 @@ #include #include + + +#ifdef __cplusplus #include #include - // 假设这些是你管理输出的列表 extern std::list errorList; extern std::list warnList; @@ -17,8 +19,19 @@ extern bool errorOutputEnabled; extern bool warnOutputEnabled; extern bool normalOutputEnabled; +void redirectErrorOutput(bool enabled); +void redirectWarnOutput(bool enabled); +void redirectNormalOutput(bool enabled); + + +extern "C" +{ +#endif // 自定义的 printf 函数 int customPrintf(const char* format, ...); +#ifdef __cplusplus +} +#endif // 使用宏将 printf 替换为 customPrintf #define printf customPrintf diff --git a/include/rocketmq/SimpleProducer.h b/include/rocketmq/SimpleProducer.h index f35e5dc..ff6a7df 100644 --- a/include/rocketmq/SimpleProducer.h +++ b/include/rocketmq/SimpleProducer.h @@ -20,6 +20,7 @@ extern "C" { void rocketmq_test_rt(); void rocketmq_test_ud(); void rocketmq_test_rc(); +void rocketmq_test_log(); void rocketmq_test_set(); void rocketmq_test_only(); void rocketmq_test_300(int mpnum,int front_index); diff --git a/json/save2json.cpp b/json/save2json.cpp index 1d37414..4204a91 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -118,9 +118,16 @@ extern std::string G_MQCONSUMER_KEY_RC;//key extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall extern std::string G_MQCONSUMER_TAG_SET;//tag extern std::string G_MQCONSUMER_KEY_SET;//key -extern std::string G_MQCONSUMER_TOPIC_LOGSET;//topie_log -extern std::string G_MQCONSUMER_TAG_LOGSET;//tag -extern std::string G_MQCONSUMER_KEY_LOGSET;//key + +extern std::string G_MQCONSUMER_TOPIC_LOG;//topie_log +extern std::string G_MQCONSUMER_TAG_LOG;//tag +extern std::string G_MQCONSUMER_KEY_LOG;//key +extern std::string G_LOG_TOPIC;//topie +extern std::string G_LOG_TAG;//tag +extern std::string G_LOG_KEY;//key +extern pthread_mutex_t errorListMutex; +extern pthread_mutex_t warnListMutex; +extern pthread_mutex_t normalListMutex; #define APRTIME_8H (28800000000ULL) #define APRTIME_1H (3600000000ULL) @@ -587,18 +594,48 @@ void KafkaSendThread::run() //lnk20250225־ Ckafka_data_t log_send; - log_send.strTopic + log_send.strTopic = QString::fromStdString(G_LOG_TOPIC); bool log_gotten; - log_gotten = false; - pthread_mutex_lock(&targetMutex); - if (!kafka_data_list.isEmpty()) { - data_gotten = true; - log_send = kafka_data_list.takeFirst(); - } - kafka_data_list_mutex.unlock(); - if (log_gotten && ) { + if (normalOutputEnabled) { + // normalOutputEnabled Ϊ 1ȴ normalList ȡ + // normalList + pthread_mutex_lock(&normalListMutex); + if (!normalList.empty()) { + data_gotten = true; + log_send.strText = QString::fromStdString(normalList.front()); + normalList.pop_front(); + } + pthread_mutex_unlock(&normalListMutex); + } else if (warnOutputEnabled) { + // normalOutputEnabled Ϊ 0 warnOutputEnabled Ϊ 1ȴ warnList ȡ + // warnList + pthread_mutex_lock(&warnListMutex); + if (!warnList.empty()) { + data_gotten = true; + log_send.strText = QString::fromStdString(warnList.front()); + warnList.pop_front(); + } + pthread_mutex_unlock(&warnListMutex); + } else if (errorOutputEnabled) { + // normalOutputEnabled warnOutputEnabled Ϊ 0 errorOutputEnabled Ϊ 1ȡ errorList + // errorList + pthread_mutex_lock(&errorListMutex); + if (!errorList.empty()) { + data_gotten = true; + log_send.strText = QString::fromStdString(errorList.front()); + errorList.pop_front(); + } + pthread_mutex_unlock(&errorListMutex); + } + + if (log_gotten) { + static uint32_t count = 0; + printf("BEGIN current log send no.%i -------->>>>>>>>>>>> %s \n", count, + QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); + my_rocketmq_send(log_send); + } /*if (data_gotten) { LD_info_t* LD_info = find_LD_info_only_from_mp_id(data.mp_id.toAscii().data()); @@ -1374,7 +1411,7 @@ int StringToInt(const std::string& str) { } // JSON ִַӦ -void parse_log(const std::string& json_str, const std::string& output_dir) { +void parse_log(const std::string& json_str) { // JSON ַ cJSON* root = cJSON_Parse(json_str.c_str()); if (root == nullptr) { @@ -1453,37 +1490,37 @@ void parse_log(const std::string& json_str, const std::string& output_dir) { //У if(frontType == subdir){ if(fun == "open"){ - if (code_str == "ERROR"){ + if (level == "ERROR"){ // ô redirectErrorOutput(true); } - else if (code_str == "WARN"){ + else if (level == "WARN"){ // ø澯 redirectWarnOutput(true); } - else if (code_str == "NORMAL"){ + else if (level == "NORMAL"){ // ͨ redirectNormalOutput(true); } else{ - std::cout << "code_str error" <//lnk20241022 -#include "../cfgparse/custom_printf.h"//lnk20250225 +#include "../cfg_parse/custom_printf.h"//lnk20250225 #define LOG_IDX (0) #define RPT_IDX (1) diff --git a/mms/ver_conf.h b/mms/ver_conf.h index 0125228..c4f37ab 100644 --- a/mms/ver_conf.h +++ b/mms/ver_conf.h @@ -2,6 +2,7 @@ #ifndef VER_CONF_H_KHCYDOPFRUYDIYFIHUIVUGUGG #define VER_CONF_H_KHCYDOPFRUYDIYFIHUIVUGUGG #include "stdio.h" +#include "../cfg_parse/custom_printf.h"//lnk20250225 const char* PROGRAM_VERSION = "1.0.2.7"; const char* PROGRAM_CREATE_TIME="2024-09-18"; diff --git a/pt61850netd_pqfe.pro b/pt61850netd_pqfe.pro index 0d89544..68eb441 100644 --- a/pt61850netd_pqfe.pro +++ b/pt61850netd_pqfe.pro @@ -109,7 +109,8 @@ HEADERS += source/mms/db_interface.h \ source/json/rdkafkacpp.h \ source/json/kafka_producer.h \ source/json/cjson.h \ - source/include/rocketmq/SimpleProducer.h + source/include/rocketmq/SimpleProducer.h \ + source/cfg_parse/custom_printf.h SOURCES += source/mms/main.c \ source/mms/clntobj.c \ source/mms/logcfgx.c \