From 093e8e5dd64e446658e83306fe2f4c318b6b1d83 Mon Sep 17 00:00:00 2001 From: lnk Date: Wed, 14 May 2025 16:42:29 +0800 Subject: [PATCH] local ledger funtion finish --- cfg_parse/cfg_parser.cpp | 303 +++++++++++++++++++++++++++++++++++--- json/save2json.cpp | 4 +- mms/main.c | 3 - mms/mms_process.c | 15 +- mms/rdb_client.c | 4 +- rocketmq/SimpleProducer.h | 4 - 6 files changed, 303 insertions(+), 30 deletions(-) diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 38fed94..7de6f2a 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -144,6 +144,7 @@ public: //lnk20250210添加进程号 char processNo[64]; + char maxProcessNum[64]; ledger_monitor line[10]; @@ -334,6 +335,11 @@ std::string intToString(int number); //lnk20250512 void send_heartbeat_to_kafka(const std::string& status); +int get_max_stat_data_index(const char* filepath); +extern void execute_bash(string fun,int process_num,string type); +extern void close_listening_socket(); +void save_ledger_json(const char* ptr); +void read_latest_ledger_file(char** out); ////////////////////////////////////////////////////////////////////////// extern int server_socket; //Web Socket服务端实例 @@ -1945,7 +1951,7 @@ int parse_3s_xml(trigger_3s_xml_t* trigger_3s_xml) //这个文件是用来记录正在进行中的实时触发 QString cfg_dir = QString("../")/*+QString::fromAscii(subdir)*/ + QString("etc/"); - load_3s_data_from_xml(trigger_3s_xml, (cfg_dir + THREE_SECS_CONFIG_FN)); //加载/Feproject/etc/Trigger3S.xml + //load_3s_data_from_xml(trigger_3s_xml, (cfg_dir + THREE_SECS_CONFIG_FN)); //加载/Feproject/etc/Trigger3S.xml QString the_webservice_xml_fn = get_3s_trig_fn();// ../etc/trigger3s/目录下的最新的xml文件,这个文件是用来打开实时触发的开关 @@ -3050,6 +3056,7 @@ void printTerminalDevMap(const QMap& terminal_dev_map) { << ", Device Key:" << QString(dev->dev_key) << ", Device Series:" << QString(dev->dev_series) << ", Device processNo:" << QString(dev->processNo) + << ", Device maxProcessNum:" << QString(dev->maxProcessNum) << ", Address:" << QString(dev->addr_str) << ", Port:" << QString(dev->port) << ", Timestamp:" << QString(dev->timestamp); @@ -3466,8 +3473,12 @@ int terminal_ledger_web(QMap* terminal_dev_map, char* ptr = NULL; + int retry = 0; + + cJSON* root = NULL; + // 发送 API 请求 - SendJsonAPI_web(WEB_DEVICE, "",parm.c_str(), &ptr); + /*SendJsonAPI_web(WEB_DEVICE, "",parm.c_str(), &ptr); if (ptr == NULL) { std::cerr << "Error: Received NULL response from SendJsonAPI_web." << std::endl; @@ -3496,18 +3507,105 @@ int terminal_ledger_web(QMap* terminal_dev_map, SendJsonAPI_web(WEB_DEVICE, "",parm.c_str(), &ptr); if(ptr == NULL){ - retry++;if(retry>3)break; - continue; + std::cerr << "Error: Received NULL response from SendJsonAPI_web in retry" << std::endl; + return 1; } root = cJSON_Parse(ptr); - retry++;if(retry>3)break; + retry++; + if(retry > 3){ + break; + } } // 如果重试后仍然失败,确保退出前释放任何已分配的内存 if (root == NULL) { - printf("web error %s\n", cJSON_GetErrorPtr()); - return 1; // 根据需要返回适当的错误码 + printf("web error after 3 retry\n"); + //return 1; // 根据需要返回适当的错误码 + //三次重试过后尝试读取本地台账文件 + char* ledger = NULL; + read_latest_ledger_file(&ledger); + if (ledger != NULL) { + root = cJSON_Parse(ledger); + free(ledger); + } + + //记录上送日志 + + //读取台账文件也失败则启用定时器5分钟等待下一次读取台账和文件,不要退出死掉 + if (root == NULL) { + printf("read local ledger failed, wait 5 min to retry...\n"); + apr_sleep(apr_time_from_sec(300)); // 睡眠 5 分钟 + + // 重新请求 + if (ptr != NULL) { + free(ptr); + ptr = NULL; + } + SendJsonAPI_web(WEB_DEVICE, "", parm.c_str(), &ptr); + if (ptr != NULL) { + root = cJSON_Parse(ptr); + if (root == NULL) { + printf("still failed after sleep retry\n"); + return 1; + } + } else { + printf("no response after sleep retry\n"); + return 1; + } + } + //记录上送日志 } + }*/ + while (1) { + // 请求接口 + SendJsonAPI_web(WEB_DEVICE, "", parm.c_str(), &ptr); + + + + if (ptr != NULL) { + + // 调试用 + printf("ptr:%s\n", ptr); + + root = cJSON_Parse(ptr); //json格式序列化 + if (root != NULL) { + break; // 解析成功,跳出循环 + } + } + + // 解析失败,尝试重试 + printf("web error %s\n", cJSON_GetErrorPtr()); + retry++; + + if (retry > 3) { + printf("web error after 3 retry\n"); + + // 释放之前的 ptr + if (ptr) { + free(ptr); + ptr = NULL; + } + + // 读取本地台账文件 + char* ledger = NULL; + read_latest_ledger_file(&ledger); + if (ledger != NULL) { + root = cJSON_Parse(ledger); + if (root != NULL) { + free(ledger); + break; // 本地台账成功解析 + } + + free(ledger); // 释放内容 + } + + // 本地文件解析仍失败,等待 5 分钟后再重试 + printf("still failed after sleep retry, wait 5 min and retry...\n"); + apr_sleep(apr_time_from_sec(300)); // 5 分钟 + + retry = 0; // 重置 retry 重新开始循环 + continue; + } } // 获取 "code" 和 "msg" @@ -3606,6 +3704,11 @@ int terminal_ledger_web(QMap* terminal_dev_map, if (processNo && processNo->type == cJSON_Number) snprintf(dev->processNo, sizeof(dev->processNo), "%d", processNo->valueint); else strncpy(dev->processNo, "N/A", sizeof(dev->processNo) - 1); + //20250513进程数量 + cJSON* maxProcessNum = cJSON_GetObjectItem(item, "maxProcessNum"); // maxProcessNum转为字符串 + if (maxProcessNum && maxProcessNum->type == cJSON_Number) snprintf(dev->maxProcessNum, sizeof(dev->maxProcessNum), "%d", maxProcessNum->valueint); + else strncpy(dev->maxProcessNum, "N/A", sizeof(dev->maxProcessNum) - 1); + cJSON* port = cJSON_GetObjectItem(item, "port"); // port if (port && port->type == cJSON_String) strncpy(dev->port, port->valuestring, sizeof(dev->port) - 1); else strncpy(dev->port, "N/A", sizeof(dev->port) - 1); @@ -3686,6 +3789,13 @@ int terminal_ledger_web(QMap* terminal_dev_map, } + //读取台账有效,保存或更新到台账文件20250513lnk + if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){ + save_ledger_json(ptr); + //普通日志,更新本地台账 + } + + // 释放资源 cJSON_Delete(root); free(ptr); @@ -3723,17 +3833,47 @@ int parse_device_cfg_web() //调试用 //printTerminalDevMap(terminal_dev_map); + //稳态进程1添加功能:判断看门狗配置是否正确 + if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) + { + int max_index; + int max_process_num; + + //读取配置文件的最大进程号 + max_index = get_max_stat_data_index("/FeProject/etc/runtime.cf"); + printf("max_index = %d\n", max_index); + + //读取第一条数据的最大进程号 + QMap::iterator it = terminal_dev_map.begin(); + if (it != terminal_dev_map.end() && it.value()) { + terminal_dev* dev = it.value(); + max_process_num = atoi(dev->maxProcessNum); // 转为 int + printf("maxProcessNum = %d\n", max_process_num); + } else { + printf("terminal_dev_map is empty or contains null entry.\n"); + } + + //判断是否相等 + if(max_process_num != max_index){ + // 调用执行脚本函数 + close_listening_socket(); + execute_bash("reset", max_process_num, "all"); + } + } count_cfg = terminal_dev_map.size();//容器的数量就是台账的数量 std::cout << "terminal_ledger_num:" << count_cfg << std::endl; + //如果当前进程获取的台账为0,按照配置数量申请空间,台账内容为空 g_node->n_clients = count_cfg; + //这里开辟的ied的空间由配置文件中的终端台账数量决定lnk20250121 if(IED_COUNT < count_cfg){ //申请数至少是初始化能读取到的台账数,防止设置失误导致的崩溃。 //如果是多进程,IED_COUNT应该设置为大于平均数, //如果是单进程则应该设置为大于终端总数, //单进程多进程同时存在则单进程应该大于终端总数,否则添加台账时单进程部分就无法同步添加。 + g_node->clients = (ied_t**)apr_pcalloc(g_cfg_pool, count_cfg * sizeof(ied_t*)); //添加提示 std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!"<< std::endl; @@ -3749,10 +3889,9 @@ int parse_device_cfg_web() //调试用 std::cout << "!!!!!!!!!!gnodeindex:" << k << std::endl; - g_node->clients[k] = (ied_t*)apr_pcalloc(g_cfg_pool, sizeof(ied_t));}//每个 g_node->clients[k] 指向的内存块是独立的(每个 ied_t 结构体占用的内存块)这是指向内存块的指针 + g_node->clients[k] = (ied_t*)apr_pcalloc(g_cfg_pool, sizeof(ied_t)); + }//每个 g_node->clients[k] 指向的内存块是独立的(每个 ied_t 结构体占用的内存块)这是指向内存块的指针 -//读取终端台账表替换为web接口 -////////////////////////////////////////////////////////////////////////////////////////////////// //读数据 try { char terminal_id[64]; @@ -4242,10 +4381,8 @@ int parse_model_cfg_web() try { char model_id[64]; char tmnl_type[64]; - //char tmnl_factory[64]; char file_name[128]; char file_path[128]; - //char timestamp[64]; otl_datetime timestamp; // 遍历终端台账容器 @@ -4260,16 +4397,12 @@ int parse_model_cfg_web() strncpy(tmnl_type, value->tmnl_type, sizeof(tmnl_type) - 1); strncpy(file_path, value->file_path, sizeof(file_path) - 1); strncpy(file_name, value->file_name, sizeof(file_name) - 1); - //strncpy(timestamp, value->timestamp, sizeof(timestamp) - 1); std::cout << "model_id" << model_id << std::endl; std::cout << "tmnl_type" << tmnl_type << std::endl; std::cout << "filepath" << file_path << std::endl; std::cout << "filename" << file_name << std::endl; - //lnk20241125测试用 - //strncpy(tmnl_type, "PS_NET", sizeof(tmnl_type) - 1); - Set_xml_databaseinfo(model_id, tmnl_type, file_path, file_name, timestamp.year, timestamp.month, timestamp.day, timestamp.hour, timestamp.minute, timestamp.second); } } @@ -4336,7 +4469,7 @@ char* parse_model_cfg_web_one(ied_t* ied, char* out_model) out_model[63] = '\0'; } - return model_id; + return out_model; } } @@ -6334,6 +6467,9 @@ void send_reply_to_kafka(const std::string& guid, const std::string& step, const << "\"guid\":\"" << guid << "\"," << "\"step\":\"" << step << "\"," << "\"result\":\"" << result << "\"" + << "\"processNo\":\"" << g_front_seg_index << "\"" + << "\"frontType\":\"" << get_front_type_from_subdir() << "\"" + << "\"nodeId\":\"" << FRONT_INST << "\"" << "}"; std::string jsonString = oss.str(); @@ -6355,7 +6491,7 @@ void send_heartbeat_to_kafka(const std::string& status) { oss << "{" << "\"nodeId\":\"" << FRONT_INST << "\"," << "\"frontType\":\"" << get_front_type_from_subdir() << "\"," - << "\"processNum\":\"" << g_front_seg_index << "\"," + << "\"processNo\":\"" << g_front_seg_index << "\"," << "\"status\":\"" << status << "\"" << "}"; @@ -6370,4 +6506,135 @@ void send_heartbeat_to_kafka(const std::string& status) { kafka_data_list_mutex.lock(); kafka_data_list.append(connect_info); kafka_data_list_mutex.unlock(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//找看门狗中 +int get_max_stat_data_index(const char* filepath) { + std::ifstream file(filepath); + if (!file.is_open()) { + std::cerr << "Failed to open file: " << filepath << std::endl; + return -1; + } + + std::string line; + int max_value = -1; + + while (std::getline(file, line)) { + // 查找符合要求的行 + if (line.find("pt61850netd_pqfe -d cfg_stat_data -s") != std::string::npos) { + // 找到 -s 参数位置 + std::size_t pos = line.find("-s"); + if (pos != std::string::npos) { + std::istringstream iss(line.substr(pos + 2)); + std::string token; + iss >> token; + + // 格式应该是 1_2 或类似格式 + std::size_t under_pos = token.find('_'); + if (under_pos != std::string::npos) { + std::string first = token.substr(0, under_pos); + std::string second = token.substr(under_pos + 1); + + int val1 = std::atoi(first.c_str()); + int val2 = std::atoi(second.c_str()); + + if (val1 > max_value) max_value = val1; + if (val2 > max_value) max_value = val2; + } + } + } + } + + file.close(); + return max_value; +} +//////////////////////////////////////////////////////////////////////////////////////////// +//保存/更新台账 +void save_ledger_json(const char* ptr) { + if (!ptr) return; + + const char* dir = "/FeProject/dat/ledger"; + + // 确保目录存在 + struct stat st; + if (stat(dir, &st) != 0) { + mkdir("/FeProject", 0755); // 可选,确保上级目录存在 + mkdir("/FeProject/dat", 0755); + mkdir(dir, 0755); + } + + // 删除已有 *_ledger.txt 文件 + DIR* dp = opendir(dir); + if (dp) { + struct dirent* entry; + while ((entry = readdir(dp)) != NULL) { + if (strstr(entry->d_name, "_ledger.txt")) { + char old_file_path[256]; + snprintf(old_file_path, sizeof(old_file_path), "%s/%s", dir, entry->d_name); + remove(old_file_path); // 删除旧文件 + } + } + closedir(dp); + } + + // 当前时间格式化为 yyyyMMddHHmmss + char time_buf[32]; + time_t now = time(NULL); + struct tm* tm_info = localtime(&now); + strftime(time_buf, sizeof(time_buf), "%Y%m%d%H%M%S", tm_info); + + // 构造文件路径 + char filepath[256]; + snprintf(filepath, sizeof(filepath), "%s/%s_ledger.txt", dir, time_buf); + + // 写入文件 + FILE* fp = fopen(filepath, "w"); + if (fp) { + fputs(ptr, fp); + fclose(fp); + } else { + perror("fopen ledger file failed"); + } +} + +void read_latest_ledger_file(char** out) { + *out = NULL; + const char* dir = "/FeProject/dat/ledger"; + DIR* dp = opendir(dir); + if (!dp) return; + + struct dirent* entry; + char latest_file[256] = {0}; + time_t latest_time = 0; + + while ((entry = readdir(dp)) != NULL) { + if (strstr(entry->d_name, "_ledger.txt")) { + char filepath[256]; + snprintf(filepath, sizeof(filepath), "%s/%s", dir, entry->d_name); + struct stat st; + if (stat(filepath, &st) == 0) { + if (st.st_mtime > latest_time) { + latest_time = st.st_mtime; + strncpy(latest_file, filepath, sizeof(latest_file) - 1); + } + } + } + } + closedir(dp); + + if (latest_file[0] != '\0') { + FILE* fp = fopen(latest_file, "r"); + if (fp) { + fseek(fp, 0, SEEK_END); + long size = ftell(fp); + fseek(fp, 0, SEEK_SET); + *out = (char*)malloc(size + 1); + if (*out) { + fread(*out, 1, size, fp); + (*out)[size] = '\0'; + } + fclose(fp); + } + } } \ No newline at end of file diff --git a/json/save2json.cpp b/json/save2json.cpp index 49d10c6..3f5affe 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -956,7 +956,7 @@ void parse_set(const std::string& json_str) { std::string frontType = front->valuestring; - if (index_value != g_front_seg_index) { + if (index_value != g_front_seg_index && g_front_seg_index != 0) { std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl; cJSON_Delete(root); return; @@ -1012,7 +1012,7 @@ void parse_set(const std::string& json_str) { //上送日志 apr_sleep(apr_time_from_sec(10)); - exit(-1039); + ::_exit(-1039); //进程退出 } else{ std::cout << "param is not executable" <chnl_counts == 0)return; + //lnk20250514如果进程启动没有台账则不往下执行,等待台账更新 + + //一次访问一个终端 do { chnl_usr = g_pt61850app->chnl_usr[chnl_sequence_no]; @@ -1349,6 +1356,10 @@ void CheckNextNotConnectedChannel() static uint32_t chnl_total_no = 0; chnl_usr_t *chnl_usr; + //lnk20250514如果进程启动没有台账则不往下执行,等待台账更新 + if(g_pt61850app->chnl_counts == 0)return; + //lnk20250514如果进程启动没有台账则不往下执行,等待台账更新 + do { chnl_usr = g_pt61850app->chnl_usr[chnl_total_no]; chnl_total_no = (chnl_total_no+1) % g_pt61850app->chnl_counts; diff --git a/mms/rdb_client.c b/mms/rdb_client.c index 27e9c86..70296d2 100644 --- a/mms/rdb_client.c +++ b/mms/rdb_client.c @@ -16,6 +16,8 @@ #include "node.h" #include //lnk20250114给台账添加互斥锁 +#include "../log4cplus/log4.h"//lnk添加log4 + /*lnk10-10 *///////////////////////////////// extern int HTTP_PORT; extern int SOCKET_PORT; @@ -192,8 +194,8 @@ apr_status_t init_rdb() } //台账读取过后初始化各级的日志 + init_loggers(); - rv = parse_model_cfg_web(); if (rv != APR_SUCCESS) { echo_errg("Parsed model with error,try to run! \n"); diff --git a/rocketmq/SimpleProducer.h b/rocketmq/SimpleProducer.h index 8f3b11e..7339aff 100644 --- a/rocketmq/SimpleProducer.h +++ b/rocketmq/SimpleProducer.h @@ -25,7 +25,6 @@ void rocketmq_test_set(); void rocketmq_test_only(); void rocketmq_test_300(int mpnum,int front_index); } -//void rocketmq_test_300(int mpnum,int front_index);//20241202lnk extern void my_rocketmq_send(Ckafka_data_t& data); @@ -48,9 +47,6 @@ struct Subscription { void rocketmq_consumer_receive( const std::string& consumerName, const std::string& nameServer, - //const std::string& topic, - //const std::string& tag, - //MessageCallBack callback); const std::vector& subscriptions); //////////////////////////////////////////////////////