add log realdata send funtion

This commit is contained in:
lnk
2025-02-25 16:33:11 +08:00
parent d1fa47e3f1
commit c597ee5b9b
5 changed files with 345 additions and 4 deletions

View File

@@ -118,6 +118,9 @@ extern std::string G_MQCONSUMER_KEY_RC;//key
extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall
extern std::string G_MQCONSUMER_TAG_SET;//tag
extern std::string G_MQCONSUMER_KEY_SET;//key
extern std::string G_MQCONSUMER_TOPIC_LOGSET;//topie_log
extern std::string G_MQCONSUMER_TAG_LOGSET;//tag
extern std::string G_MQCONSUMER_KEY_LOGSET;//key
#define APRTIME_8H (28800000000ULL)
#define APRTIME_1H (3600000000ULL)
@@ -127,11 +130,8 @@ const int MAX_LIST_SIZE = 16;
static QMap<int, QList<long long> > real_data_report_map;
static QMap<QString, json_block_data*> json_data_map;//CZY 2023-08-17 ww 2023<32><33>3<EFBFBD><33>13<31><33>17:23:17<31><37>չMap<61><70><EFBFBD><EFBFBD><EFBFBD>ڱ<EFBFBD><DAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>·<EFBFBD><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
static QMap<QString, json_block_data*> json_flicker_data_map;//CZY 2023-09-11 չMap<61><70><EFBFBD><EFBFBD><EFBFBD>ڱ<EFBFBD><DAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>·<EFBFBD><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
static QMap<QString, json_block_data*> json_pst_data_map;//CZY 2023-09-11 չMap<61><70><EFBFBD><EFBFBD><EFBFBD>ڱ<EFBFBD><DAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>·<EFBFBD><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
int urcbRealDataHasReceived(int dev_index, LD_info_t* LD_info, long long Time)
{
QList<long long>& ts_list = real_data_report_map[LD_info->line_id];
@@ -584,6 +584,21 @@ void KafkaSendThread::run()
printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
}
//lnk20250225<32><35><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD>
Ckafka_data_t log_send;
log_send.strTopic
bool log_gotten;
log_gotten = false;
pthread_mutex_lock(&targetMutex);
if (!kafka_data_list.isEmpty()) {
data_gotten = true;
log_send = kafka_data_list.takeFirst();
}
kafka_data_list_mutex.unlock();
if (log_gotten && ) {
/*if (data_gotten) {
LD_info_t* LD_info = find_LD_info_only_from_mp_id(data.mp_id.toAscii().data());
@@ -1358,6 +1373,131 @@ int StringToInt(const std::string& str) {
return number;
}
// <20><><EFBFBD><EFBFBD> JSON <20>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD>ִ<EFBFBD><D6B4><EFBFBD><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD><EFBFBD>
void parse_log(const std::string& json_str, const std::string& output_dir) {
// <20><><EFBFBD><EFBFBD> JSON <20>ַ<EFBFBD><D6B7><EFBFBD>
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return;
}
// <20><>ȡ "messageBody" <20><><EFBFBD><EFBFBD>
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return ;
}
// <20><><EFBFBD><EFBFBD> messageBody <20>е<EFBFBD> JSON <20>ַ<EFBFBD><D6B7><EFBFBD>
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // <20><><EFBFBD><EFBFBD> messageBody <20>ַ<EFBFBD><D6B7><EFBFBD>
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return ;
}
// <20><>ȡ code <20>ֶ<EFBFBD>
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return;
}
cJSON* index = cJSON_GetObjectItem(messageBody, "index");
if (index == nullptr) {
std::cout << "Missing 'index' in JSON." << std::endl;
cJSON_Delete(root);
return;
}
//<2F>ж<EFBFBD><D0B6>Dz<EFBFBD><C7B2><EFBFBD><EFBFBD>Լ<EFBFBD><D4BC><EFBFBD><EFBFBD>̺ţ<CCBA>
int index_value = index->valueint;
//string index_value_str = index->valuestring;
//int index_value = StringToInt(index_value_str);
//<2F><><EFBFBD>̺<EFBFBD>Ϊ0<CEAA>Ľ<EFBFBD><C4BD>̴<EFBFBD><CCB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̨<EFBFBD>˸<EFBFBD><CBB8><EFBFBD><EFBFBD><EFBFBD>Ϣ
if (index_value != g_front_seg_index) {
std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return;
}
//<2F><><EFBFBD>̺<EFBFBD>ƥ<EFBFBD><C6A5><EFBFBD><EFBFBD>
std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl;
// <20><><EFBFBD><EFBFBD> code <20>ֶ<EFBFBD>ִֵ<D6B5>в<EFBFBD>ͬ<EFBFBD>Ľ<EFBFBD><C4BD><EFBFBD><EFBFBD>߼<EFBFBD>
std::string code_str = code->valuestring;
if (code_str == "set_log") {
// <20><><EFBFBD><EFBFBD> set_process
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
std::string fun = cJSON_GetObjectItem(item, "fun")->valuestring;
std::string level = cJSON_GetObjectItem(item, "level")->valuestring;
std::string frontType = cJSON_GetObjectItem(item, "frontType")->valuestring;
//У<><D0A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if(frontType == subdir){
if(fun == "open"){
if (code_str == "ERROR"){
// <20><><EFBFBD>ô<EFBFBD><C3B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
redirectErrorOutput(true);
}
else if (code_str == "WARN"){
// <20><><EFBFBD>ø澯<C3B8><E6BEAF><EFBFBD><EFBFBD>
redirectWarnOutput(true);
}
else if (code_str == "NORMAL"){
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͨ<EFBFBD><CDA8><EFBFBD><EFBFBD>
redirectNormalOutput(true);
}
else{
std::cout << "code_str error" <<std::endl;
}
}
else{
if (code_str == "ERROR"){
// <20>رմ<D8B1><D5B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
redirectErrorOutput(false);
}
else if (code_str == "WARN"){
// <20><><EFBFBD>ø澯<C3B8><E6BEAF><EFBFBD><EFBFBD>
redirectWarnOutput(false);
}
else if (code_str == "NORMAL"){
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͨ<EFBFBD><CDA8><EFBFBD><EFBFBD>
redirectNormalOutput(false);
}
else{
std::cout << "code_str error" <<std::endl;
}
}
}
else{
std::cout << "front type doesnt match" <<std::endl;
}
}
std::cout << "this msg should only execute once" <<std::endl;
}
// <20>ͷ<EFBFBD> JSON <20><><EFBFBD><EFBFBD>
cJSON_Delete(root);
}
// <20><><EFBFBD><EFBFBD> JSON <20>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD>ִ<EFBFBD><D6B4><EFBFBD><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD><EFBFBD>
void parse_control(const std::string& json_str, const std::string& output_dir) {
// <20><><EFBFBD><EFBFBD> JSON <20>ַ<EFBFBD><D6B7><EFBFBD>
@@ -1794,6 +1934,40 @@ int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg)
return E_CONSUME_SUCCESS;
}
int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg)
{
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><E7A3AC>ӡ<EFBFBD><D3A1>Ϣ<EFBFBD><CFA2><EFBFBD>ݣ<EFBFBD>
std::cout << "process Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD≯<EFBFBD><CCB8><EFBFBD><EFBFBD><EFBFBD>Ϣ
parse_log(body);
}
// <20><><EFBFBD><EFBFBD>ҵ<EFBFBD><D2B5><EFBFBD>߼<EFBFBD><DFBC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>״̬
return E_CONSUME_SUCCESS;
}
int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg)
{
//<2F><><EFBFBD><EFBFBD>
@@ -1843,6 +2017,7 @@ int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg)
return E_CONSUME_SUCCESS;
}
void mqconsumerThread::run()
{
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߲<EFBFBD><DFB2><EFBFBD>
@@ -1870,6 +2045,9 @@ void mqconsumerThread::run()
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset));
}
// <20><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>5 //<2F><><EFBFBD>н<EFBFBD><D0BD>̶<EFBFBD><CCB6><EFBFBD><E1B6A9><EFBFBD><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD>topic<69><63><EFBFBD><EFBFBD>ͬ<EFBFBD><CDAC><EFBFBD>ܽ<EFBFBD><DCBD>̵<EFBFBD><CCB5><EFBFBD>־<EFBFBD><D6BE><EFBFBD>Ͳ<EFBFBD><CDB2>ܻ<EFBFBD><DCBB><EFBFBD>Ӱ<EFBFBD><D3B0>
subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog));
try {
rocketmq_consumer_receive(consumerName, nameServer, subscriptions);
}