local ledger funtion finish

This commit is contained in:
lnk
2025-05-14 16:42:29 +08:00
parent a6685ca801
commit 093e8e5dd6
6 changed files with 303 additions and 30 deletions

View File

@@ -144,6 +144,7 @@ public:
//lnk20250210添加进程号
char processNo[64];
char maxProcessNum[64];
ledger_monitor line[10];
@@ -334,6 +335,11 @@ std::string intToString(int number);
//lnk20250512
void send_heartbeat_to_kafka(const std::string& status);
int get_max_stat_data_index(const char* filepath);
extern void execute_bash(string fun,int process_num,string type);
extern void close_listening_socket();
void save_ledger_json(const char* ptr);
void read_latest_ledger_file(char** out);
//////////////////////////////////////////////////////////////////////////
extern int server_socket; //Web Socket服务端实例
@@ -1945,7 +1951,7 @@ int parse_3s_xml(trigger_3s_xml_t* trigger_3s_xml)
//这个文件是用来记录正在进行中的实时触发
QString cfg_dir = QString("../")/*+QString::fromAscii(subdir)*/ + QString("etc/");
load_3s_data_from_xml(trigger_3s_xml, (cfg_dir + THREE_SECS_CONFIG_FN)); //加载/Feproject/etc/Trigger3S.xml
//load_3s_data_from_xml(trigger_3s_xml, (cfg_dir + THREE_SECS_CONFIG_FN)); //加载/Feproject/etc/Trigger3S.xml
QString the_webservice_xml_fn = get_3s_trig_fn();// ../etc/trigger3s/目录下的最新的xml文件这个文件是用来打开实时触发的开关
@@ -3050,6 +3056,7 @@ void printTerminalDevMap(const QMap<QString, terminal_dev*>& terminal_dev_map) {
<< ", Device Key:" << QString(dev->dev_key)
<< ", Device Series:" << QString(dev->dev_series)
<< ", Device processNo:" << QString(dev->processNo)
<< ", Device maxProcessNum:" << QString(dev->maxProcessNum)
<< ", Address:" << QString(dev->addr_str)
<< ", Port:" << QString(dev->port)
<< ", Timestamp:" << QString(dev->timestamp);
@@ -3466,8 +3473,12 @@ int terminal_ledger_web(QMap<QString, terminal_dev*>* terminal_dev_map,
char* ptr = NULL;
int retry = 0;
cJSON* root = NULL;
// 发送 API 请求
SendJsonAPI_web(WEB_DEVICE, "",parm.c_str(), &ptr);
/*SendJsonAPI_web(WEB_DEVICE, "",parm.c_str(), &ptr);
if (ptr == NULL) {
std::cerr << "Error: Received NULL response from SendJsonAPI_web." << std::endl;
@@ -3496,18 +3507,105 @@ int terminal_ledger_web(QMap<QString, terminal_dev*>* terminal_dev_map,
SendJsonAPI_web(WEB_DEVICE, "",parm.c_str(), &ptr);
if(ptr == NULL){
retry++;if(retry>3)break;
continue;
std::cerr << "Error: Received NULL response from SendJsonAPI_web in retry" << std::endl;
return 1;
}
root = cJSON_Parse(ptr);
retry++;if(retry>3)break;
retry++;
if(retry > 3){
break;
}
}
// 如果重试后仍然失败,确保退出前释放任何已分配的内存
if (root == NULL) {
printf("web error %s\n", cJSON_GetErrorPtr());
return 1; // 根据需要返回适当的错误码
printf("web error after 3 retry\n");
//return 1; // 根据需要返回适当的错误码
//三次重试过后尝试读取本地台账文件
char* ledger = NULL;
read_latest_ledger_file(&ledger);
if (ledger != NULL) {
root = cJSON_Parse(ledger);
free(ledger);
}
//记录上送日志
//读取台账文件也失败则启用定时器5分钟等待下一次读取台账和文件不要退出死掉
if (root == NULL) {
printf("read local ledger failed, wait 5 min to retry...\n");
apr_sleep(apr_time_from_sec(300)); // 睡眠 5 分钟
// 重新请求
if (ptr != NULL) {
free(ptr);
ptr = NULL;
}
SendJsonAPI_web(WEB_DEVICE, "", parm.c_str(), &ptr);
if (ptr != NULL) {
root = cJSON_Parse(ptr);
if (root == NULL) {
printf("still failed after sleep retry\n");
return 1;
}
} else {
printf("no response after sleep retry\n");
return 1;
}
}
//记录上送日志
}
}*/
while (1) {
// 请求接口
SendJsonAPI_web(WEB_DEVICE, "", parm.c_str(), &ptr);
if (ptr != NULL) {
// 调试用
printf("ptr:%s\n", ptr);
root = cJSON_Parse(ptr); //json格式序列化
if (root != NULL) {
break; // 解析成功,跳出循环
}
}
// 解析失败,尝试重试
printf("web error %s\n", cJSON_GetErrorPtr());
retry++;
if (retry > 3) {
printf("web error after 3 retry\n");
// 释放之前的 ptr
if (ptr) {
free(ptr);
ptr = NULL;
}
// 读取本地台账文件
char* ledger = NULL;
read_latest_ledger_file(&ledger);
if (ledger != NULL) {
root = cJSON_Parse(ledger);
if (root != NULL) {
free(ledger);
break; // 本地台账成功解析
}
free(ledger); // 释放内容
}
// 本地文件解析仍失败,等待 5 分钟后再重试
printf("still failed after sleep retry, wait 5 min and retry...\n");
apr_sleep(apr_time_from_sec(300)); // 5 分钟
retry = 0; // 重置 retry 重新开始循环
continue;
}
}
// 获取 "code" 和 "msg"
@@ -3606,6 +3704,11 @@ int terminal_ledger_web(QMap<QString, terminal_dev*>* terminal_dev_map,
if (processNo && processNo->type == cJSON_Number) snprintf(dev->processNo, sizeof(dev->processNo), "%d", processNo->valueint);
else strncpy(dev->processNo, "N/A", sizeof(dev->processNo) - 1);
//20250513进程数量
cJSON* maxProcessNum = cJSON_GetObjectItem(item, "maxProcessNum"); // maxProcessNum转为字符串
if (maxProcessNum && maxProcessNum->type == cJSON_Number) snprintf(dev->maxProcessNum, sizeof(dev->maxProcessNum), "%d", maxProcessNum->valueint);
else strncpy(dev->maxProcessNum, "N/A", sizeof(dev->maxProcessNum) - 1);
cJSON* port = cJSON_GetObjectItem(item, "port"); // port
if (port && port->type == cJSON_String) strncpy(dev->port, port->valuestring, sizeof(dev->port) - 1);
else strncpy(dev->port, "N/A", sizeof(dev->port) - 1);
@@ -3686,6 +3789,13 @@ int terminal_ledger_web(QMap<QString, terminal_dev*>* terminal_dev_map,
}
//读取台账有效保存或更新到台账文件20250513lnk
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
save_ledger_json(ptr);
//普通日志,更新本地台账
}
// 释放资源
cJSON_Delete(root);
free(ptr);
@@ -3723,17 +3833,47 @@ int parse_device_cfg_web()
//调试用
//printTerminalDevMap(terminal_dev_map);
//稳态进程1添加功能判断看门狗配置是否正确
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1)
{
int max_index;
int max_process_num;
//读取配置文件的最大进程号
max_index = get_max_stat_data_index("/FeProject/etc/runtime.cf");
printf("max_index = %d\n", max_index);
//读取第一条数据的最大进程号
QMap<QString, terminal_dev*>::iterator it = terminal_dev_map.begin();
if (it != terminal_dev_map.end() && it.value()) {
terminal_dev* dev = it.value();
max_process_num = atoi(dev->maxProcessNum); // 转为 int
printf("maxProcessNum = %d\n", max_process_num);
} else {
printf("terminal_dev_map is empty or contains null entry.\n");
}
//判断是否相等
if(max_process_num != max_index){
// 调用执行脚本函数
close_listening_socket();
execute_bash("reset", max_process_num, "all");
}
}
count_cfg = terminal_dev_map.size();//容器的数量就是台账的数量
std::cout << "terminal_ledger_num:" << count_cfg << std::endl;
//如果当前进程获取的台账为0按照配置数量申请空间台账内容为空
g_node->n_clients = count_cfg;
//这里开辟的ied的空间由配置文件中的终端台账数量决定lnk20250121
if(IED_COUNT < count_cfg){ //申请数至少是初始化能读取到的台账数,防止设置失误导致的崩溃。
//如果是多进程IED_COUNT应该设置为大于平均数
//如果是单进程则应该设置为大于终端总数,
//单进程多进程同时存在则单进程应该大于终端总数,否则添加台账时单进程部分就无法同步添加。
g_node->clients = (ied_t**)apr_pcalloc(g_cfg_pool, count_cfg * sizeof(ied_t*));
//添加提示
std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!"<< std::endl;
@@ -3749,10 +3889,9 @@ int parse_device_cfg_web()
//调试用
std::cout << "!!!!!!!!!!gnodeindex:" << k << std::endl;
g_node->clients[k] = (ied_t*)apr_pcalloc(g_cfg_pool, sizeof(ied_t));}//每个 g_node->clients[k] 指向的内存块是独立的(每个 ied_t 结构体占用的内存块)这是指向内存块的指针
g_node->clients[k] = (ied_t*)apr_pcalloc(g_cfg_pool, sizeof(ied_t));
}//每个 g_node->clients[k] 指向的内存块是独立的(每个 ied_t 结构体占用的内存块)这是指向内存块的指针
//读取终端台账表替换为web接口
//////////////////////////////////////////////////////////////////////////////////////////////////
//读数据
try {
char terminal_id[64];
@@ -4242,10 +4381,8 @@ int parse_model_cfg_web()
try {
char model_id[64];
char tmnl_type[64];
//char tmnl_factory[64];
char file_name[128];
char file_path[128];
//char timestamp[64];
otl_datetime timestamp;
// 遍历终端台账容器
@@ -4260,16 +4397,12 @@ int parse_model_cfg_web()
strncpy(tmnl_type, value->tmnl_type, sizeof(tmnl_type) - 1);
strncpy(file_path, value->file_path, sizeof(file_path) - 1);
strncpy(file_name, value->file_name, sizeof(file_name) - 1);
//strncpy(timestamp, value->timestamp, sizeof(timestamp) - 1);
std::cout << "model_id" << model_id << std::endl;
std::cout << "tmnl_type" << tmnl_type << std::endl;
std::cout << "filepath" << file_path << std::endl;
std::cout << "filename" << file_name << std::endl;
//lnk20241125测试用
//strncpy(tmnl_type, "PS_NET", sizeof(tmnl_type) - 1);
Set_xml_databaseinfo(model_id, tmnl_type, file_path, file_name, timestamp.year, timestamp.month, timestamp.day, timestamp.hour, timestamp.minute, timestamp.second);
}
}
@@ -4336,7 +4469,7 @@ char* parse_model_cfg_web_one(ied_t* ied, char* out_model)
out_model[63] = '\0';
}
return model_id;
return out_model;
}
}
@@ -6334,6 +6467,9 @@ void send_reply_to_kafka(const std::string& guid, const std::string& step, const
<< "\"guid\":\"" << guid << "\","
<< "\"step\":\"" << step << "\","
<< "\"result\":\"" << result << "\""
<< "\"processNo\":\"" << g_front_seg_index << "\""
<< "\"frontType\":\"" << get_front_type_from_subdir() << "\""
<< "\"nodeId\":\"" << FRONT_INST << "\""
<< "}";
std::string jsonString = oss.str();
@@ -6355,7 +6491,7 @@ void send_heartbeat_to_kafka(const std::string& status) {
oss << "{"
<< "\"nodeId\":\"" << FRONT_INST << "\","
<< "\"frontType\":\"" << get_front_type_from_subdir() << "\","
<< "\"processNum\":\"" << g_front_seg_index << "\","
<< "\"processNo\":\"" << g_front_seg_index << "\","
<< "\"status\":\"" << status << "\""
<< "}";
@@ -6370,4 +6506,135 @@ void send_heartbeat_to_kafka(const std::string& status) {
kafka_data_list_mutex.lock();
kafka_data_list.append(connect_info);
kafka_data_list_mutex.unlock();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//找看门狗中
int get_max_stat_data_index(const char* filepath) {
std::ifstream file(filepath);
if (!file.is_open()) {
std::cerr << "Failed to open file: " << filepath << std::endl;
return -1;
}
std::string line;
int max_value = -1;
while (std::getline(file, line)) {
// 查找符合要求的行
if (line.find("pt61850netd_pqfe -d cfg_stat_data -s") != std::string::npos) {
// 找到 -s 参数位置
std::size_t pos = line.find("-s");
if (pos != std::string::npos) {
std::istringstream iss(line.substr(pos + 2));
std::string token;
iss >> token;
// 格式应该是 1_2 或类似格式
std::size_t under_pos = token.find('_');
if (under_pos != std::string::npos) {
std::string first = token.substr(0, under_pos);
std::string second = token.substr(under_pos + 1);
int val1 = std::atoi(first.c_str());
int val2 = std::atoi(second.c_str());
if (val1 > max_value) max_value = val1;
if (val2 > max_value) max_value = val2;
}
}
}
}
file.close();
return max_value;
}
////////////////////////////////////////////////////////////////////////////////////////////
//保存/更新台账
void save_ledger_json(const char* ptr) {
if (!ptr) return;
const char* dir = "/FeProject/dat/ledger";
// 确保目录存在
struct stat st;
if (stat(dir, &st) != 0) {
mkdir("/FeProject", 0755); // 可选,确保上级目录存在
mkdir("/FeProject/dat", 0755);
mkdir(dir, 0755);
}
// 删除已有 *_ledger.txt 文件
DIR* dp = opendir(dir);
if (dp) {
struct dirent* entry;
while ((entry = readdir(dp)) != NULL) {
if (strstr(entry->d_name, "_ledger.txt")) {
char old_file_path[256];
snprintf(old_file_path, sizeof(old_file_path), "%s/%s", dir, entry->d_name);
remove(old_file_path); // 删除旧文件
}
}
closedir(dp);
}
// 当前时间格式化为 yyyyMMddHHmmss
char time_buf[32];
time_t now = time(NULL);
struct tm* tm_info = localtime(&now);
strftime(time_buf, sizeof(time_buf), "%Y%m%d%H%M%S", tm_info);
// 构造文件路径
char filepath[256];
snprintf(filepath, sizeof(filepath), "%s/%s_ledger.txt", dir, time_buf);
// 写入文件
FILE* fp = fopen(filepath, "w");
if (fp) {
fputs(ptr, fp);
fclose(fp);
} else {
perror("fopen ledger file failed");
}
}
void read_latest_ledger_file(char** out) {
*out = NULL;
const char* dir = "/FeProject/dat/ledger";
DIR* dp = opendir(dir);
if (!dp) return;
struct dirent* entry;
char latest_file[256] = {0};
time_t latest_time = 0;
while ((entry = readdir(dp)) != NULL) {
if (strstr(entry->d_name, "_ledger.txt")) {
char filepath[256];
snprintf(filepath, sizeof(filepath), "%s/%s", dir, entry->d_name);
struct stat st;
if (stat(filepath, &st) == 0) {
if (st.st_mtime > latest_time) {
latest_time = st.st_mtime;
strncpy(latest_file, filepath, sizeof(latest_file) - 1);
}
}
}
}
closedir(dp);
if (latest_file[0] != '\0') {
FILE* fp = fopen(latest_file, "r");
if (fp) {
fseek(fp, 0, SEEK_END);
long size = ftell(fp);
fseek(fp, 0, SEEK_SET);
*out = (char*)malloc(size + 1);
if (*out) {
fread(*out, 1, size, fp);
(*out)[size] = '\0';
}
fclose(fp);
}
}
}