fix log realdata send funtion

This commit is contained in:
lnk
2025-02-26 16:39:10 +08:00
parent c597ee5b9b
commit f167d705a9
9 changed files with 126 additions and 38 deletions

View File

@@ -136,6 +136,7 @@
"simpleproducer.h": "c",
"stdbool.h": "c",
"node.h": "c",
"save2json.h": "c"
"save2json.h": "c",
"custom_printf.h": "c"
}
}

View File

@@ -799,6 +799,21 @@ void rocketmq_test_rc()
my_rocketmq_send(data);
}
extern std::string G_MQCONSUMER_TOPIC_LOG;
void rocketmq_test_log()
{
Ckafka_data_t data;
data.monitor_id = 123123;
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG);
std::ifstream file("log_test.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = QString::fromStdString(buffer.str());
data.mp_id = 123123;
my_rocketmq_send(data);
}
}

View File

@@ -1043,6 +1043,7 @@ void init_config() {
MULTIPLE_NODE_FLAG = 0;
std::cout << "<EFBFBD><EFBFBD>ǰ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ǵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>:" << std::endl;
}
//20250109lnk<6E><6B><EFBFBD>ӽ<EFBFBD><D3BD>̲<EFBFBD><CCB2>Դ<EFBFBD>ӡ<EFBFBD>˿<EFBFBD>
if (g_node_id == STAT_DATA_BASE_NODE_ID)//ͳ<>Ʋɼ<C6B2>
TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
@@ -1055,7 +1056,7 @@ void init_config() {
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//<2F><>̬¼<CCAC><C2BC>
TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
}
// CZY <20><><EFBFBD><EFBFBD> ping IP
@@ -3446,7 +3447,9 @@ int GetServerIndexFromDB() //
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) >= 0) {
ifc.ifc_len = sizeof buf;
ifc.ifc_buf = (caddr_t)buf;
if (!ioctl(fd, SIOCGIFCONF, (char*)&ifc)) {
interface = ifc.ifc_len / sizeof(struct ifreq);
printf("\ninterface num is interface= %d\n", interface);
@@ -15218,8 +15221,10 @@ void redirectWarnOutput(bool enabled) {
if (enabled) {
static RedirectStreamBuf warnBuf(warnList, warnListMutex);
std::clog.rdbuf(&warnBuf);
std::cerr.rdbuf(&warnBuf);
} else {
std::clog.rdbuf(nullptr); // <20>ָ<EFBFBD><D6B8><EFBFBD><EFBFBD><EFBFBD>׼<EFBFBD><EFBFBD><E6BEAF>
std::cerr.rdbuf(nullptr); // <20>ָ<EFBFBD><D6B8><EFBFBD><EFBFBD><EFBFBD>׼<EFBFBD><D7BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
}
}
@@ -15229,8 +15234,12 @@ void redirectNormalOutput(bool enabled) {
if (enabled) {
static RedirectStreamBuf normalBuf(normalList, normalListMutex);
std::cout.rdbuf(&normalBuf);
std::clog.rdbuf(&normalBuf);
std::cerr.rdbuf(&normalBuf);
} else {
std::cout.rdbuf(nullptr); // <20>ָ<EFBFBD><D6B8><EFBFBD><EFBFBD><EFBFBD>׼<EFBFBD><D7BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
std::clog.rdbuf(nullptr); // <20>ָ<EFBFBD><D6B8><EFBFBD><EFBFBD><EFBFBD>׼<EFBFBD><EFBFBD><E6BEAF>
std::cerr.rdbuf(nullptr); // <20>ָ<EFBFBD><D6B8><EFBFBD><EFBFBD><EFBFBD>׼<EFBFBD><D7BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
}
}
@@ -15240,35 +15249,44 @@ int customPrintf(const char* format, ...) {
va_list args;
va_start(args, format);
char buffer[1024];
vsnprintf(buffer, sizeof(buffer), format, args);
// <20>ȸ<EFBFBD>ʽ<EFBFBD><CABD><EFBFBD>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD> buffer
int written = vsnprintf(buffer, sizeof(buffer), format, args);
// <20><><EFBFBD><EFBFBD>ԭʼ va_list
va_end(args);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>п<EFBFBD><EFBFBD>ض<EFBFBD>û<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʹ<EFBFBD><EFBFBD>ԭ<EFBFBD><EFBFBD> printf <EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if (!errorOutputEnabled && !warnOutputEnabled && !normalOutputEnabled) {
vprintf(format, args);
return 0; // <20><><EFBFBD><EFBFBD>ֵΪ<D6B5>Ѵ<EFBFBD>ӡ<EFBFBD>ַ<EFBFBD><D6B7><EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʽ<EFBFBD><EFBFBD>ʧ<EFBFBD>ܣ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ش<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if (written < 0) {
return -1;
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>õĿ<C3B5><C4BF>ؽ<EFBFBD><D8BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӵ<EFBFBD><D3B5><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD>б<EFBFBD><D0B1><EFBFBD>
if (errorOutputEnabled) {
pthread_mutex_lock(&errorListMutex); // <20><><EFBFBD><EFBFBD> errorList
pthread_mutex_lock(&errorListMutex);
errorList.push_back(buffer);
pthread_mutex_unlock(&errorListMutex); // <20><><EFBFBD><EFBFBD> errorList
pthread_mutex_unlock(&errorListMutex);
}
if (warnOutputEnabled) {
pthread_mutex_lock(&warnListMutex); // <20><><EFBFBD><EFBFBD> warnList
pthread_mutex_lock(&warnListMutex);
warnList.push_back(buffer);
pthread_mutex_unlock(&warnListMutex); // <20><><EFBFBD><EFBFBD> warnList
pthread_mutex_unlock(&warnListMutex);
}
if (normalOutputEnabled) {
pthread_mutex_lock(&normalListMutex); // <20><><EFBFBD><EFBFBD> normalList
pthread_mutex_lock(&normalListMutex);
normalList.push_back(buffer);
pthread_mutex_unlock(&normalListMutex); // <20><><EFBFBD><EFBFBD> normalList
pthread_mutex_unlock(&normalListMutex);
}
return 0; // <20><><EFBFBD><EFBFBD>ֵΪ<EFBFBD>Ѵ<EFBFBD>ӡ<EFBFBD>ַ<EFBFBD><EFBFBD><EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>п<EFBFBD><EFBFBD>ض<EFBFBD>û<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʹ<EFBFBD><EFBFBD>ԭ<EFBFBD><EFBFBD> printf <20><EFBFBD><EFBFBD><EFBFBD>
if (!errorOutputEnabled && !warnOutputEnabled && !normalOutputEnabled) {
// ֱ<><D6B1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ն<EFBFBD>
std::cout << buffer << std::endl; // ʹ<><CAB9> std::cout <20><><EFBFBD><EFBFBD> printf <20><><EFBFBD>ij<EFBFBD>ͻ
}
return written; // <20><><EFBFBD><EFBFBD><EFBFBD>Ѵ<EFBFBD>ӡ<EFBFBD>ַ<EFBFBD><D6B7><EFBFBD>
}
///////////////////////////////////////////////////////////////////////////////

View File

@@ -4,9 +4,11 @@
#include <stdio.h>
#include <stdarg.h>
#ifdef __cplusplus
#include <list>
#include <string>
// 假设这些是你管理输出的列表
extern std::list<std::string> errorList;
extern std::list<std::string> warnList;
@@ -17,8 +19,19 @@ extern bool errorOutputEnabled;
extern bool warnOutputEnabled;
extern bool normalOutputEnabled;
void redirectErrorOutput(bool enabled);
void redirectWarnOutput(bool enabled);
void redirectNormalOutput(bool enabled);
extern "C"
{
#endif
// 自定义的 printf 函数
int customPrintf(const char* format, ...);
#ifdef __cplusplus
}
#endif
// 使用宏将 printf 替换为 customPrintf
#define printf customPrintf

View File

@@ -20,6 +20,7 @@ extern "C" {
void rocketmq_test_rt();
void rocketmq_test_ud();
void rocketmq_test_rc();
void rocketmq_test_log();
void rocketmq_test_set();
void rocketmq_test_only();
void rocketmq_test_300(int mpnum,int front_index);

View File

@@ -118,9 +118,16 @@ extern std::string G_MQCONSUMER_KEY_RC;//key
extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall
extern std::string G_MQCONSUMER_TAG_SET;//tag
extern std::string G_MQCONSUMER_KEY_SET;//key
extern std::string G_MQCONSUMER_TOPIC_LOGSET;//topie_log
extern std::string G_MQCONSUMER_TAG_LOGSET;//tag
extern std::string G_MQCONSUMER_KEY_LOGSET;//key
extern std::string G_MQCONSUMER_TOPIC_LOG;//topie_log
extern std::string G_MQCONSUMER_TAG_LOG;//tag
extern std::string G_MQCONSUMER_KEY_LOG;//key
extern std::string G_LOG_TOPIC;//topie
extern std::string G_LOG_TAG;//tag
extern std::string G_LOG_KEY;//key
extern pthread_mutex_t errorListMutex;
extern pthread_mutex_t warnListMutex;
extern pthread_mutex_t normalListMutex;
#define APRTIME_8H (28800000000ULL)
#define APRTIME_1H (3600000000ULL)
@@ -587,18 +594,48 @@ void KafkaSendThread::run()
//lnk20250225<32><35><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD>
Ckafka_data_t log_send;
log_send.strTopic
log_send.strTopic = QString::fromStdString(G_LOG_TOPIC);
bool log_gotten;
log_gotten = false;
pthread_mutex_lock(&targetMutex);
if (!kafka_data_list.isEmpty()) {
data_gotten = true;
log_send = kafka_data_list.takeFirst();
}
kafka_data_list_mutex.unlock();
if (log_gotten && ) {
if (normalOutputEnabled) {
// <20><><EFBFBD><EFBFBD> normalOutputEnabled Ϊ 1<><31><EFBFBD><EFBFBD><EFBFBD>ȴ<EFBFBD> normalList <20><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
// <20><><EFBFBD><EFBFBD> normalList <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
pthread_mutex_lock(&normalListMutex);
if (!normalList.empty()) {
data_gotten = true;
log_send.strText = QString::fromStdString(normalList.front());
normalList.pop_front();
}
pthread_mutex_unlock(&normalListMutex);
} else if (warnOutputEnabled) {
// <20><><EFBFBD><EFBFBD> normalOutputEnabled Ϊ 0<><30><EFBFBD><EFBFBD> warnOutputEnabled Ϊ 1<><31><EFBFBD><EFBFBD><EFBFBD>ȴ<EFBFBD> warnList <20><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
// <20><><EFBFBD><EFBFBD> warnList <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
pthread_mutex_lock(&warnListMutex);
if (!warnList.empty()) {
data_gotten = true;
log_send.strText = QString::fromStdString(warnList.front());
warnList.pop_front();
}
pthread_mutex_unlock(&warnListMutex);
} else if (errorOutputEnabled) {
// <20><><EFBFBD><EFBFBD> normalOutputEnabled <20><> warnOutputEnabled <20><>Ϊ 0<><30><EFBFBD><EFBFBD> errorOutputEnabled Ϊ 1<><31>ȡ errorList <20><><EFBFBD><EFBFBD>
// <20><><EFBFBD><EFBFBD> errorList <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
pthread_mutex_lock(&errorListMutex);
if (!errorList.empty()) {
data_gotten = true;
log_send.strText = QString::fromStdString(errorList.front());
errorList.pop_front();
}
pthread_mutex_unlock(&errorListMutex);
}
if (log_gotten) {
static uint32_t count = 0;
printf("BEGIN current log send no.%i -------->>>>>>>>>>>> %s \n", count,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
my_rocketmq_send(log_send);
}
/*if (data_gotten) {
LD_info_t* LD_info = find_LD_info_only_from_mp_id(data.mp_id.toAscii().data());
@@ -1374,7 +1411,7 @@ int StringToInt(const std::string& str) {
}
// <20><><EFBFBD><EFBFBD> JSON <20>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD>ִ<EFBFBD><D6B4><EFBFBD><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD><EFBFBD>
void parse_log(const std::string& json_str, const std::string& output_dir) {
void parse_log(const std::string& json_str) {
// <20><><EFBFBD><EFBFBD> JSON <20>ַ<EFBFBD><D6B7><EFBFBD>
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
@@ -1453,37 +1490,37 @@ void parse_log(const std::string& json_str, const std::string& output_dir) {
//У<><D0A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if(frontType == subdir){
if(fun == "open"){
if (code_str == "ERROR"){
if (level == "ERROR"){
// <20><><EFBFBD>ô<EFBFBD><C3B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
redirectErrorOutput(true);
}
else if (code_str == "WARN"){
else if (level == "WARN"){
// <20><><EFBFBD>ø澯<C3B8><E6BEAF><EFBFBD><EFBFBD>
redirectWarnOutput(true);
}
else if (code_str == "NORMAL"){
else if (level == "NORMAL"){
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͨ<EFBFBD><CDA8><EFBFBD><EFBFBD>
redirectNormalOutput(true);
}
else{
std::cout << "code_str error" <<std::endl;
std::cout << "level error" <<std::endl;
}
}
else{
if (code_str == "ERROR"){
if (level == "ERROR"){
// <20>رմ<D8B1><D5B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
redirectErrorOutput(false);
}
else if (code_str == "WARN"){
else if (level == "WARN"){
// <20><><EFBFBD>ø澯<C3B8><E6BEAF><EFBFBD><EFBFBD>
redirectWarnOutput(false);
}
else if (code_str == "NORMAL"){
else if (level == "NORMAL"){
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͨ<EFBFBD><CDA8><EFBFBD><EFBFBD>
redirectNormalOutput(false);
}
else{
std::cout << "code_str error" <<std::endl;
std::cout << "level error" <<std::endl;
}
}
}
@@ -1493,6 +1530,7 @@ void parse_log(const std::string& json_str, const std::string& output_dir) {
}
std::cout << "this msg should only execute once" <<std::endl;
}
}
// <20>ͷ<EFBFBD> JSON <20><><EFBFBD><EFBFBD>
cJSON_Delete(root);

View File

@@ -17,7 +17,7 @@
#include "apr_time.h"
#include <stdbool.h>//lnk20241022
#include "../cfgparse/custom_printf.h"//lnk20250225
#include "../cfg_parse/custom_printf.h"//lnk20250225
#define LOG_IDX (0)
#define RPT_IDX (1)

View File

@@ -2,6 +2,7 @@
#ifndef VER_CONF_H_KHCYDOPFRUYDIYFIHUIVUGUGG
#define VER_CONF_H_KHCYDOPFRUYDIYFIHUIVUGUGG
#include "stdio.h"
#include "../cfg_parse/custom_printf.h"//lnk20250225
const char* PROGRAM_VERSION = "1.0.2.7";
const char* PROGRAM_CREATE_TIME="2024-09-18";

View File

@@ -109,7 +109,8 @@ HEADERS += source/mms/db_interface.h \
source/json/rdkafkacpp.h \
source/json/kafka_producer.h \
source/json/cjson.h \
source/include/rocketmq/SimpleProducer.h
source/include/rocketmq/SimpleProducer.h \
source/cfg_parse/custom_printf.h
SOURCES += source/mms/main.c \
source/mms/clntobj.c \
source/mms/logcfgx.c \