//////////////////////////////////////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include #include #include #include // for std::move #include // for std::free #include #include #include #include #include #include #include #include #include //////////////////////////////////////////////////////////////////////////////////////////////////////////////// #include "curl/curl.h" #include "interface.h" #include "rocketmq.h" #include "nlohmann/json.hpp" #include "log4.h" //////////////////////////////////////////////////////////////////////////////////////////////////////////////// using json = nlohmann::json; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// std::map xmlinfo_list;//保存所有型号对应的icd映射文件解析数据 XmlConfig xmlcfg;//星形接线xml节点解析的数据-默认映射文件解析数据 std::list topicList; //队列发送主题链表 XmlConfig xmlcfg2;//角型接线xml节点解析的数据-默认映射文件解析数据 std::list topicList2; //角型接线发送主题链表 std::map xmlinfo_list2;//保存所有型号角形接线对应的icd映射文件解析数据 //台账list std::vector terminal_devlist; //台账锁 std::mutex ledgermtx; ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// extern int g_front_seg_num; extern uint32_t g_node_id; //筛选的终端状态:数组【0,1】筛选运行和在运 extern std::string TERMINAL_STATUS; //筛选的icd范围:1根据台账获取0获取全部 extern std::string ICD_FLAG; //五个接口 extern std::string WEB_FILEUPLOAD; extern std::string WEB_FILEDOWNLOAD; extern std::string WEB_ICD; extern std::string WEB_DEVICE; extern std::string WEB_EVENT; //角型接线标志 extern int isdelta_flag; //单个进程的台账数量 extern int IED_COUNT; /////////////////////////////////////////////////////////////////////////////////////////////////////// void handleCommentResponse(const std::string& response); //////////////////////////////////////////////////////////////////////////////////////////////////////// extern void execute_bash(std::string fun,int process_num,std::string type); ////////////////////////////////////////////////////////////////////////////////////////////////////////通用curl请求接口 size_t req_reply_web(void* ptr, size_t size, size_t nmemb, void* stream) { std::string* str = (std::string*)stream; (*str).append((char*)ptr, size * nmemb); return size * nmemb; } void SendJsonAPI_web(const std::string& strUrl, //接口路径 const std::string& code, //上传文件用 const std::string& json, //请求json std::string& responseStr) //响应 { CURL* curl = curl_easy_init(); CURLcode res; responseStr.clear(); if (curl) { // 使用 std::string 拼接完整 URL std::string fullUrl = strUrl + "?" + code; std::cout << ">>>json " << fullUrl << std::endl; curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, req_reply_web); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &responseStr); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10); if (!json.empty()) { curl_easy_setopt(curl, CURLOPT_POST, 1L); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json.c_str()); } struct curl_slist* headers = nullptr; headers = curl_slist_append(headers, "Content-Type: application/json"); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); res = curl_easy_perform(curl); if (res != CURLE_OK) { std::cerr << "web failed, res code: " << curl_easy_strerror(res) << std::endl; } else { std::cout << ">>> web return str: " << responseStr << std::endl; } curl_slist_free_all(headers); curl_easy_cleanup(curl); } else { std::cerr << ">>> web curl init failed" << std::endl; } } ////////////////////////////////////////////////////////////////////////////////////////////////////////上传文件接口 //处理文件上传响应 void handleUploadResponse(const std::string& response, std::string& wavepath) { using nlohmann::json; //把 nlohmann::json 这个名字带到当前作用域 // 解析 JSON 响应 json json_data; try { json_data = json::parse(response); } catch (const json::parse_error& e) { std::cerr << "Error parsing response: " << e.what() << std::endl; DIY_ERRORLOG("process", "【ERROR】前置上传暂态录波文件失败,web返回的消息不是json格式"); return; } // 提取字段 if (!json_data.contains("code") || !json_data.contains("data")) { std::cerr << "Error: Missing expected fields in JSON response." << std::endl; DIY_ERRORLOG("process", "【ERROR】前置上传暂态录波文件失败,web返回的消息没有远端文件名"); return; } std::string code = json_data["code"].get(); std::cout << "Response Code: " << code << std::endl; std::string msg = json_data.value("msg", std::string{"not found"}); std::cout << "Message: " << msg << std::endl; auto& data = json_data["data"]; if (!data.contains("name") || !data.contains("fileName") || !data.contains("url")) { std::cerr << "Error: Missing expected fields in JSON data object." << std::endl; DIY_ERRORLOG("process", "【ERROR】前置上传暂态录波文件失败,web返回的消息没有远端文件名"); return; } std::string name = data["name"].get(); std::string fileName = data["fileName"].get(); std::string url = data["url"].get(); // 输出信息 std::cout << "File Path: " << name << std::endl; std::cout << "Uploaded File Name: " << fileName << std::endl; std::cout << "File URL: " << url << std::endl; // 找到最后一个 '.' size_t pos = name.find_last_of('.'); std::string nameWithoutExt; if (pos != std::string::npos) { // 截取去掉后缀的部分 nameWithoutExt = name.substr(0, pos); } else { // 如果没有后缀,直接使用原文件名 nameWithoutExt = name; } // 拷贝到 wavepath wavepath = nameWithoutExt; std::cout << "wavepath: " << wavepath << std::endl; DIY_INFOLOG("process", "【NORMAL】前置上传暂态录波文件成功,远端文件名:%s", wavepath.c_str()); } //上传文件 void SendFileWeb(const std::string& strUrl, const std::string& localpath, const std::string& cloudpath, std::string& wavepath) { // 基本存在性检查 if (access(localpath.c_str(), F_OK) != 0) { std::cerr << "Local file does not exist: " << localpath << std::endl; return; } // ★新增:stat 打印大小,便于快速确认读源 struct stat st {}; if (stat(localpath.c_str(), &st) != 0) { perror("stat"); } else { std::cout << "[debug] upload file: " << localpath << ", size=" << static_cast(st.st_size) << " bytes\n"; } // 初始化 curl CURL* curl = curl_easy_init(); if (curl) { // ★新增:错误缓冲,便于看到更具体的错误 char errbuf[CURL_ERROR_SIZE] = {0}; curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errbuf); // ★新增 // 设置请求 URL 和 POST 请求 curl_easy_setopt(curl, CURLOPT_URL, strUrl.c_str()); curl_easy_setopt(curl, CURLOPT_POST, 1); // 创建表单数据 curl_httppost* formpost = nullptr; curl_httppost* lastptr = nullptr; // 添加文件字段,直接从本地路径读取文件内容 curl_formadd(&formpost, &lastptr, CURLFORM_COPYNAME, "file", CURLFORM_FILE, localpath.c_str(), CURLFORM_END); // 添加 `path` 字段 curl_formadd(&formpost, &lastptr, CURLFORM_COPYNAME, "path", CURLFORM_COPYCONTENTS, cloudpath.c_str(), CURLFORM_END); // 添加 `isReserveName` 字段 curl_formadd(&formpost, &lastptr, CURLFORM_COPYNAME, "isReserveName", CURLFORM_COPYCONTENTS, "true", CURLFORM_END); // 设置表单数据到请求 curl_easy_setopt(curl, CURLOPT_HTTPPOST, formpost); // 设置头信息 //struct curl_slist* header_list = nullptr; //header_list = curl_slist_append(header_list, "Content-Type: multipart/form-data"); //curl_easy_setopt(curl, CURLOPT_HTTPHEADER, header_list); // 设置超时时间 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); // ★新增:避免多线程环境下超时触发信号 // 设置写入响应数据的函数 std::string resPost0; curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)&resPost0); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, req_reply_web); // 执行请求 CURLcode res = curl_easy_perform(curl); if (res != CURLE_OK) { const char* em = errbuf[0] ? errbuf : curl_easy_strerror(res); std::cerr << "http web failed: " << em << std::endl; DIY_ERRORLOG("process","【ERROR】前置上传暂态录波文件 %s 失败,请检查文件上传接口配置",localpath); } else { std::cout << "http web success, response: " << resPost0 << std::endl; handleUploadResponse(resPost0, wavepath); // 处理响应 } // 清理 curl_formfree(formpost); // 释放表单数据 //curl_slist_free_all(header_list); // 释放头部列表 curl_easy_cleanup(curl); } else { std::cerr << ">>> curl init failed" << std::endl; } } //上传暂态文件 void SOEFileWeb(std::string& localpath,std::string& cloudpath, std::string& wavepath) { SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath); } //上传文件测试函数 void Fileupload_test() { std::string localpath = FRONT_PATH + "/bin/file_test.txt"; std::string cloudpath = "/fileuploadtest/"; std::string wavepath; SOEFileWeb(localpath,cloudpath,wavepath); std::cout << "wavepath:" << wavepath << std::endl; } //////////////////////////////////////////////////////////////////////////////////////////////////////映射文件下载接口 void download_xml_for_icd(const std::string& MODEL_ID, const std::string& TMNL_TYPE, const std::string& FILE_PATH, const std::string& FILE_NAME, const std::string& time) { if (MODEL_ID.empty() || TMNL_TYPE.empty() || FILE_PATH.empty() || FILE_NAME.empty()) { std::cerr << "Function download_xml_for_icd Error, param empty!" << std::endl; return; } std::cout << "Function download_xml_for_icd Start!" << std::endl; std::string id(MODEL_ID); std::string type(TMNL_TYPE); std::string filepath(FILE_PATH); std::string name(FILE_NAME); std::string currentTime(time); // 调试用 std::cout << "terminal type:" << TMNL_TYPE << std::endl; auto it = xmlinfo_list.find(type); if (it == xmlinfo_list.end()) // 在终端类型列表中没查到 { Xmldata* config = new Xmldata(); // 没找到就插个新的终端类型到列表中 xmlinfo_list[type] = config; //在这里将 config 转成一个Xmldata 的右值引用,触发 unique_ptr 的移动赋值 // 调试 std::cout << "xmlinfo_list insert type:" << type << std::endl; } else // 查到就更新覆盖 { // 调试 std::cout << "xmlinfo_list type contain:" << type << std::endl; if (xmlinfo_list[type]->xmlbase.updatetime == currentTime) { // 终端型号更新标志,如果新增的型号错误,导致实际用的映射文件不一样,或者覆盖了原来的映射文件这里可能出问题。数据库在录入型号和映射文件时要注意 xmlinfo_list[type]->updataflag = false; // 时间值一样说明是没有更新,当前业务中不包含时间值,所以每次都会更新 } else { xmlinfo_list[type]->updataflag = true; } } xmlinfo_list[type]->xmlbase.MODEL_ID = id; xmlinfo_list[type]->xmlbase.TMNL_TYPE = type; xmlinfo_list[type]->xmlbase.FILE_PATH = filepath; xmlinfo_list[type]->xmlbase.FILE_NAME = name; xmlinfo_list[type]->xmlbase.updatetime = currentTime; //调试 std::cout << "##################################isdelta_flag is " << isdelta_flag << std::endl; if (isdelta_flag) { std::cout << "xmllist2 create" << std::endl; auto it2 = xmlinfo_list2.find(type); if (it2 == xmlinfo_list2.end()) { Xmldata* config = new Xmldata(); xmlinfo_list2[type] = config; } else { if (xmlinfo_list2[type]->xmlbase.updatetime == currentTime) { xmlinfo_list2[type]->updataflag = false; } else { xmlinfo_list2[type]->updataflag = true; } } xmlinfo_list2[type]->xmlbase.MODEL_ID = id; xmlinfo_list2[type]->xmlbase.TMNL_TYPE = type; xmlinfo_list2[type]->xmlbase.FILE_PATH = filepath; xmlinfo_list2[type]->xmlbase.FILE_NAME = name; xmlinfo_list2[type]->xmlbase.updatetime = currentTime; } //下载文件 std::string remote_file_name = name; std::string save_name = FRONT_PATH + "/dat/" + id + ".xml"; // 本地保存路径 std::cout << "remote file name:" << remote_file_name << "local save name:" << save_name << std::endl; // mq日志 DIY_WARNLOG("process","【WARN】前置获取到终端类型%s,该终端类型对应的映射文件为%s,映射文件将下载并保存在本地为%s",TMNL_TYPE,FILE_PATH,save_name); std::string fileContent; std::string fullPath = std::string("filePath=") + filepath; //填写远端路径作为入参 // 调试 std::cout << "input fullpath" << fullPath << std::endl; SendJsonAPI_web(WEB_FILEDOWNLOAD, fullPath.c_str(), "", fileContent); if (!fileContent.empty()) { // 创建并打开文件 std::ofstream outFile(save_name, std::ios::out); // 文本模式打开 if (outFile.is_open()) { // 将文件流写入文件 outFile.write(fileContent.c_str(), fileContent.size()); outFile.close(); std::cout << "File saved successfully!" << std::endl; DIY_WARNLOG("process","【WARN】前置下载映射文件%s成功",save_name); } else { std::cerr << "Error: Unable to open file for writing." << std::endl; DIY_ERRORLOG("process","【ERROR】前置写入本地映射文件%s失败",save_name); } } else { std::cerr << "Error: Unable to download file." << std::endl; DIY_ERRORLOG("process","【ERROR】前置调用文件下载接口下载远端文件文件%s失败",FILE_PATH); } } ////////////////////////////////////////////////////////////////////////////////////////////////////台账接口 //读最新本地台账 std::string read_latest_ledger_file() { const char* dir = std::string(FRONT_PATH + "/dat/ledger").c_str(); DIR* dp = opendir(dir); if (!dp) return ""; struct dirent* entry; std::string latest_file; time_t latest_time = 0; while ((entry = readdir(dp)) != nullptr) { if (strstr(entry->d_name, "_ledger.txt")) { std::string filepath = std::string(dir) + "/" + entry->d_name; std::cout << "localledger filepath" << filepath << std::endl; struct stat st; if (stat(filepath.c_str(), &st) == 0 && st.st_mtime > latest_time) { latest_time = st.st_mtime; latest_file = filepath; } } } closedir(dp); if (!latest_file.empty()) { std::ifstream inFile(latest_file); if (inFile) { std::string content((std::istreambuf_iterator(inFile)), std::istreambuf_iterator()); return content; } } return ""; } //保存台账到本地 void save_ledger_json(const std::string& content) { if (content.empty()) return; const std::string dir = FRONT_PATH + "/dat/ledger"; // 确保目录存在 struct stat st; if (stat(dir.c_str(), &st) != 0) { mkdir(FRONT_PATH.c_str(), 0755); // 确保上级目录存在(可选) mkdir(std::string(FRONT_PATH + "/dat").c_str(), 0755); mkdir(dir.c_str(), 0755); } // 删除已有 *_ledger.txt 文件 DIR* dp = opendir(dir.c_str()); if (dp) { struct dirent* entry; while ((entry = readdir(dp)) != nullptr) { if (strstr(entry->d_name, "_ledger.txt")) { std::string old_file_path = dir + "/" + entry->d_name; std::remove(old_file_path.c_str()); } } closedir(dp); } // 当前时间格式化为 yyyyMMddHHmmss char time_buf[32]; time_t now = time(nullptr); struct tm* tm_info = localtime(&now); strftime(time_buf, sizeof(time_buf), "%Y%m%d%H%M%S", tm_info); // 构造文件路径 std::string filepath = dir + "/" + time_buf + "_ledger.txt"; // 写入文件 std::ofstream outFile(filepath); if (outFile) { outFile << content; outFile.close(); } else { std::cerr << "Error: Unable to open ledger file for writing: " << filepath << std::endl; } } //找看门狗中最大的进程号 int get_max_stat_data_index(const std::string& filepath) { std::ifstream file(filepath.c_str()); if (!file.is_open()) { std::cerr << "Failed to open file: " << filepath << std::endl; return -1; } std::string line; int max_value = -1; while (std::getline(file, line)) { // 查找符合要求的行 if (line.find("pt61850netd_pqfe -d cfg_stat_data -s") != std::string::npos) { // 找到 -s 参数位置 std::size_t pos = line.find("-s"); if (pos != std::string::npos) { std::istringstream iss(line.substr(pos + 2)); std::string token; iss >> token; // 格式应该是 1_2 或类似格式 std::size_t under_pos = token.find('_'); if (under_pos != std::string::npos) { std::string first = token.substr(0, under_pos); std::string second = token.substr(under_pos + 1); int val1 = std::atoi(first.c_str()); int val2 = std::atoi(second.c_str()); if (val1 > max_value) max_value = val1; if (val2 > max_value) max_value = val2; } } } } file.close(); return max_value; } //台账信息获取 int terminal_ledger_web(std::map& terminal_dev_map, const std::string& inputstring) { if (inputstring.empty()) { std::cerr << "Error: inputstring is empty\n"; DIY_ERRORLOG("process","【ERROR】前置的%d号进程调用web台账接口的入参为空", g_front_seg_index); return 1; } std::string inputparm = inputstring; std::string responseStr; int retry = 0; nlohmann::json json_data; // 1. 拉取并解析 JSON,最多 3 次 while (true) { SendJsonAPI_web(WEB_DEVICE, "", inputparm, responseStr); if (!responseStr.empty()) { try { json_data = nlohmann::json::parse(responseStr); if (json_data.contains("data") && json_data["data"].is_array() && !json_data["data"].empty()) { break; } std::cerr << "data 无效或为空数组,重试\n"; DIY_ERRORLOG("process","【ERROR】前置从web接口中获取的台账信息为空或者无效信息无法解析,请核对前置使用的入参信息:%s",inputparm.c_str()); } catch (const nlohmann::json::parse_error& e) { std::cerr << "parse error: " << e.what() << ", retrying...\n"; } } else { std::cerr << "HTTP response NULL, retrying...\n"; } if (++retry > 3) { std::cerr << "web error after 3 retry, fallback to local file\n"; std::string ledger = read_latest_ledger_file(); if (!ledger.empty()) { try { json_data = nlohmann::json::parse(ledger); if (json_data.contains("data") && json_data["data"].is_array() && !json_data["data"].empty()) { break; } DIY_ERRORLOG("process", "【ERROR】前置从本地台账中获取的台账信息为空或者无效信息无法解析,请核对前置使用的入参信息:%s",inputparm.c_str()); } catch (const nlohmann::json::parse_error& e) { std::cerr << "local parse error: " << e.what() << "\n"; } } std::cerr << "still failed, sleep 5 min then retry...\n"; std::this_thread::sleep_for(std::chrono::minutes(5)); retry = 0; continue; } } // 2. 安全读取 code/msg std::string code = json_data.value("code", "not found"); std::string msg = json_data.value("msg", "not found"); std::cout << "code: " << code << "\n"; std::cout << "msg : " << msg << "\n"; // 3. 逐条解析 data const auto& data = json_data["data"]; for (size_t i = 0; i < data.size(); ++i) { const auto& item = data[i]; if (!item.is_object()) { std::cerr << "Warning: Invalid item at index " << i << "\n"; continue; } terminal_dev dev; // 不用 new auto safe_str = [](const nlohmann::json& j, const char* key) -> std::string { if (!j.contains(key) || j[key].is_null()) return "N/A"; if (j[key].is_string()) return j[key].get(); if (j[key].is_number_integer()) return std::to_string(j[key].get()); if (j[key].is_number_unsigned()) return std::to_string(j[key].get()); if (j[key].is_number_float()) return std::to_string(j[key].get()); return "N/A"; }; dev.terminal_id = safe_str(item, "id"); dev.addr_str = safe_str(item, "ip"); dev.terminal_name = safe_str(item, "name"); //dev.org_name = safe_str(item, "org_name"); //dev.maint_name = safe_str(item, "maint_name"); //dev.station_name = safe_str(item, "stationName"); //dev.tmnl_factory = safe_str(item, "manufacturer"); //dev.tmnl_status = safe_str(item, "status"); dev.dev_type = safe_str(item, "devType"); //dev.dev_key = safe_str(item, "devKey"); //dev.dev_series = safe_str(item, "series"); //dev.port = safe_str(item, "port"); //dev.timestamp = safe_str(item, "updateTime"); dev.processNo = safe_str(item, "node"); //dev.maxProcessNum = safe_str(item, "maxProcessNum"); //dev.mac = safe_str(item, "mac");//添加mac if (item.contains("monitorData") && item["monitorData"].is_array()) { for (auto& mon : item["monitorData"]) { if (dev.line.size() >= 10) break; ledger_monitor m; m.monitor_id = safe_str(mon, "id"); m.terminal_id = safe_str(mon, "deviceId"); m.monitor_name = safe_str(mon, "name"); m.logical_device_seq = safe_str(mon, "lineNo"); m.voltage_level = safe_str(mon, "voltageLevel"); m.terminal_connect = safe_str(mon, "ptType"); m.timestamp = safe_str(mon, "updateTime"); m.status = safe_str(mon, "status"); m.CT1 = mon.value("ct1", 0.0); m.CT2 = mon.value("ct2", 0.0); m.PT1 = mon.value("pt1", 0.0); m.PT2 = mon.value("pt2", 0.0); dev.line.push_back(m); } } // 插入 map(去重 + 仅本进程号匹配时插入) // dev 已经是对象类型 std::string key = dev.terminal_id; auto it = terminal_dev_map.find(key); bool match = false; try { match = (std::stoi(dev.processNo) == g_front_seg_index || g_front_seg_index == 0); } catch (...) { std::cerr << "processNo parse error for terminal: " << dev.terminal_id << "\n"; } if (it != terminal_dev_map.end()) { std::cerr << "Duplicate terminal_id: " << key << std::endl; terminal_dev_map.erase(it); // 覆盖前先提示并擦除 if (match) { std::cout << "remove duplicate terminal ledger and insert lastest terminal ledger id:" << key << std::endl; terminal_dev_map[key] = dev; } } else { if (match) { std::cout << "process num match, terminal ledger insert id:" << key << std::endl; terminal_dev_map[key] = dev; } } } // 5. 主进程保存台账 //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { if (g_front_seg_index == 1) { save_ledger_json(responseStr); } return 0; } //台账信息写入全局 int parse_device_cfg_web() { std::cout << "parse_device_cfg_web" << std::endl; // 1. 构造入参 JSON std::string input_jstr = "{"; input_jstr += "\"id\":\"" + FRONT_INST + "\","; input_jstr += "\"runFlag\":" + TERMINAL_STATUS; input_jstr += "}"; std::cout << "input_jstr: " << input_jstr << std::endl; DIY_DEBUGLOG("process","【DEBUG】前置的%d号进程调用web接口获取台账使用的请求输入为:%s", g_front_seg_index, input_jstr.c_str()); // 2. 调用接口 std::map terminal_dev_map; if (terminal_ledger_web(terminal_dev_map, input_jstr)) { return 1; // 入参为空或接口失败 } // 3. 调试打印 printTerminalDevMap(terminal_dev_map); // 4. 看门狗配置校验(仅主进程) //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { if (g_front_seg_index == 1) { int max_index = get_max_stat_data_index(FRONT_PATH + "/etc/runtime.cf"); std::cout << "max_index = " << max_index << std::endl; int max_process_num = 0; auto it = terminal_dev_map.begin(); if (it != terminal_dev_map.end()) { const terminal_dev& dev = it->second; max_process_num = std::atoi(dev.maxProcessNum.c_str()); std::cout << "maxProcessNum = " << max_process_num << std::endl; } else { std::cout << "terminal_dev_map is empty." << std::endl; } if (max_process_num != max_index) { if (max_process_num >= 1 && max_process_num <= 9) { DIY_WARNLOG("process", "【WARN】前置比对台账获取的进程数:%d和本地配置的进程数:%d,不匹配,按照台账进程数重置前置的进程数量",max_process_num, max_index); execute_bash("reset", max_process_num, "all"); } else { DIY_ERRORLOG("process","【ERROR】前置从台账获取的进程数:%d不符合范围1~9,按照本地配置进程数启动进程",max_process_num); } } } // 5. 台账数量与配置比对 int count_cfg = static_cast(terminal_dev_map.size()); std::cout << "terminal_ledger_num: " << count_cfg << std::endl; DIY_DEBUGLOG("process", "【DEBUG】前置的%d号进程调用获取到的台账的数量为:%d", g_front_seg_index, count_cfg); if (IED_COUNT < count_cfg) { std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl; //DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT); } else { //DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT); } ///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map,后续根据协议台账修改 // 先清空全局 container,再逐个拷贝 map 中的 terminal_dev std::lock_guard lock(ledgermtx); terminal_devlist.clear(); for (const auto& kv : terminal_dev_map) { terminal_dev dev = kv.second; // kv.second 是对象,不用判断指针 // ======= [新增] 对 terminal_dev 中 web 未返回/未设置字段做统一初始化,避免脏值 ======= dev.guid.clear(); // [新增] 业务 guid 初始为空 dev.busytype = 0; // [新增] 业务类型(状态机)默认 0 dev.isbusy = 0; // [新增] 未进行业务 dev.busytimecount = 0; // [新增] 业务计时清零 dev.internal_values.clear(); // [新增] 内部定值清空,等后续业务真实填充 dev.dz_internal_info_list.clear(); // [新增] 内部定值信息清空,等后续业务真实填充 // ------------------------------------------------------------------------------------ // ======= [新增] 对每个监测点做必要的内部结构初始化 ======= for (auto &mon : dev.line) { // 暂态事件容器显式清理(虽然默认构造已空,但这里确保无脏数据) mon.qvvrevent.qvvrdata.clear(); // [新增] mon.qvvrevent.qvvrfile.clear(); // [新增] // 定值列表清理,等待后续配置/采集填充 mon.set_values.clear(); // [新增] mon.dz_info_list.clear(); // [新增] } // ------------------------------------------------------------------------------------ terminal_devlist.push_back(dev); } // 判断监测点接线类型 for (auto& dev : terminal_devlist) { for (auto& mon : dev.line) { if (!mon.terminal_connect.empty() && mon.terminal_connect != "0") { isdelta_flag = 1; std::cout << "monitor_id " << mon.monitor_id<< " v_wiring_type: " << mon.terminal_connect << " is delta wiring: " << isdelta_flag << std::endl; DIY_WARNLOG("process","【WARN】前置连接的监测点%s是角形接线,对应终端为%s 终端类型是%s",mon.monitor_id.c_str(),dev.terminal_id.c_str(),dev.dev_type.c_str()); } } } terminal_dev_map.clear(); return 0; } ////////////////////////////////////////////////////////////////////////////////////////////////////icd接口 int parse_model_web(std::map* icd_model_map, const std::string& inputstr) { // 1. 清理调用方 map 中的旧数据 for (auto& kv : *icd_model_map) { delete kv.second; } icd_model_map->clear(); // 2. 准备请求参数 std::string inputparm = inputstr; // 装置型号列表,格式 ["型号1","型号2",...] 或 [] // 3. 调用接口并重试解析 JSON(最多 3 次) std::string responseStr; nlohmann::json root; bool parsed = false; const int maxRetry = 3; for (int attempt = 1; attempt <= maxRetry; ++attempt) { // 发起 HTTP 请求 SendJsonAPI_web(WEB_ICD, "", inputparm, responseStr); if (responseStr.empty()) { std::cerr << "Attempt " << attempt << ": received NULL response, retrying...\n"; continue; } std::cout << "Attempt " << attempt << " icd responseStr: " << responseStr << "\n"; // 尝试解析 try { root = nlohmann::json::parse(responseStr); parsed = true; break; } catch (const nlohmann::json::parse_error& e) { std::cerr << "Attempt " << attempt << " parse error: " << e.what() << ", retrying...\n"; } } // 4. 如果解析仍失败,返回错误 if (!parsed) { std::cerr << "Error: failed to retrieve or parse JSON after " << maxRetry << " attempts\n"; return 1; } // 5. 打印 code / msg std::string code = root.value("code", "not found"); std::string msg = root.value("msg", "not found"); std::cout << "code: " << code << "\n"; std::cout << "msg : " << msg << "\n"; // 6. 处理 data 数组,将每个条目填充到调用方的 map if (root.contains("data") && root["data"].is_array()) { for (auto& item : root["data"]) { auto model = new icd_model(); if (item.contains("id") && item["id"].is_string()) model->model_id = item["id"].get(); if (item.contains("devType") && item["devType"].is_string()) model->tmnl_type = item["devType"].get(); if (item.contains("devTypeId") && item["devTypeId"].is_string()) model->tmnl_type_id = item["devTypeId"].get(); if (item.contains("devFactory") && item["devFactory"].is_string()) model->tmnl_factory = item["devFactory"].get(); if (item.contains("fileName") && item["fileName"].is_string()) model->file_name = item["fileName"].get(); if (item.contains("filePath") && item["filePath"].is_string()) model->file_path = item["filePath"].get(); if (item.contains("updateTime") && item["updateTime"].is_string()) model->updatetime = item["updateTime"].get(); // 只有当 model_id 不为空时才插入 if (!model->model_id.empty()) { (*icd_model_map)[model->model_id] = model; } else { delete model; } } } return 0; } int parse_model_cfg_web() { // 1. 根据全局终端列表去重 std::set devTypes; for (auto& dev : terminal_devlist) { if (!dev.dev_type.empty()) { devTypes.insert(dev.dev_type); } } std::cout << "终端总数: " << terminal_devlist.size() << "\n"; std::cout << "不同的 dev_type 个数: " << devTypes.size() << "\n"; for (auto& t : devTypes) { std::cout << " - " << t << "\n"; } // 2. 构造 JSON 参数 std::string input_jstr; if (ICD_FLAG == "1") { input_jstr = "["; bool first = true; for (auto& t : devTypes) { if (!first) input_jstr += ","; first = false; input_jstr += "\"" + t + "\""; } input_jstr += "]"; } else { input_jstr = "[]"; } std::cout << "input_jstr: " << input_jstr << "\n"; // 3. 调用接口 std::map icd_model_map; if (parse_model_web(&icd_model_map, input_jstr)) { DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index); // 确保释放 map for (auto& kv : icd_model_map) delete kv.second; return 0; } // 4. 遍历并下载 try { for (auto& kv : icd_model_map) { icd_model* mdl = kv.second; if (!mdl) continue; std::cout << "model_id : " << mdl->model_id << "\n"; std::cout << "tmnl_type : " << mdl->tmnl_type << "\n"; std::cout << "file_path : " << mdl->file_path << "\n"; std::cout << "file_name : " << mdl->file_name << "\n"; std::cout << "timestamp : " << mdl->updatetime << "\n"; download_xml_for_icd( mdl->model_id, mdl->tmnl_type, mdl->file_path, mdl->file_name, mdl->updatetime ); } } catch (const std::exception& e) { std::cout << "icd model error, ERROR code=" << e.what() << std::endl; // 释放 map for (auto& kv : icd_model_map) delete kv.second; return 1; } // 5. 释放所有 icd_model 对象 for (auto& kv : icd_model_map) { delete kv.second; } return 0; } ////////////////////////////////////////////////////////////////////////////////////////////////////icd接口台账更新 std::string parse_model_cfg_web_one(const std::string& terminal_type) { // 本地 map,用完后我们会把里面的指针 delete std::map icd_model_map; // 1. 构造 JSON 数组:["terminal_type"] std::string input_jstr = "[\"" + terminal_type + "\"]"; std::cout << "input_jstr: " << input_jstr << std::endl; // 2. 拉取并解析 if (parse_model_web(&icd_model_map, input_jstr) != 0) { std::cerr << "parse_model_web failed for type: " << terminal_type << std::endl; DIY_ERRORLOG("process","【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index); // 清理(即使 map 为空,也安全) for (auto& kv : icd_model_map) delete kv.second; return ""; } std::string ret_model_id; // 3. 找到第一个模型,打印并下载 if (icd_model_map.empty()) { std::cerr << "Warning: no ICD model returned for type: " << terminal_type << std::endl; } else { auto& kv = *icd_model_map.begin(); icd_model* mdl = kv.second; if (mdl) { ret_model_id = mdl->model_id; std::cout << "model_id : " << mdl->model_id << std::endl; std::cout << "tmnl_type : " << mdl->tmnl_type << std::endl; std::cout << "file_path : " << mdl->file_path << std::endl; std::cout << "file_name : " << mdl->file_name << std::endl; std::cout << "updatetime : " << mdl->updatetime << std::endl; try { download_xml_for_icd( mdl->model_id, mdl->tmnl_type, mdl->file_path, mdl->file_name, mdl->updatetime ); } catch (const std::exception& e) { std::cerr << "download_xml_for_icd exception: " << e.what() << std::endl; } } } // 4. 释放所有 new 出来的 icd_model for (auto& kv : icd_model_map) { delete kv.second; } icd_model_map.clear(); return ret_model_id; } ///////////////////////////////////////////////////////////////////////////////////////////////////////暂态接口 std::string FormatTimeForFilename(const std::string& timeStr) { std::string result; for (char c : timeStr) { if (isdigit(c)) { result += c; } } return result; } // 将 JSON 字符串写入指定文件(C++11 版本,使用 std::string) static void writeJsonToFile(const std::string& filePath, const std::string& jsonString) { FILE* fp = fopen(filePath.c_str(), "w"); if (!fp) { DIY_ERRORLOG("process", "【ERROR】无法将暂态事件写入本地缓存"); std::cerr << "Failed to write in file : " << filePath << std::endl; return; } fprintf(fp, "%s", jsonString.c_str()); fclose(fp); } // 获取指定目录下所有文件的信息(文件名、修改时间、大小),以便后续做删除或判断文件总大小 struct FileInfo { std::string fileName; time_t modTime; // 上次修改时间 long long fileSize; // 文件大小 }; // 扫描目录,获取该目录下所有普通文件的信息 static void getDirectoryFilesInfo(const std::string& dirPath, std::vector& fileList) { DIR* dp = opendir(dirPath.c_str()); if (!dp) { std::cerr << "Failed to open directory: " << dirPath << " - " << strerror(errno) << std::endl; return; } struct dirent* entry = nullptr; while ((entry = readdir(dp)) != nullptr) { // 跳过 . 和 .. if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { continue; } // 拼接完整路径 std::string fullPath = dirPath; if (fullPath.back() != '/' && fullPath.back() != '\\') { fullPath += '/'; } fullPath += entry->d_name; // 获取文件信息 struct stat st; if (stat(fullPath.c_str(), &st) == 0) { if (S_ISREG(st.st_mode)) { FileInfo fi; fi.fileName = fullPath; fi.modTime = st.st_mtime; fi.fileSize = static_cast(st.st_size); fileList.push_back(fi); } } else { std::cerr << "Failed to stat file: " << fullPath << " - " << strerror(errno) << std::endl; } } closedir(dp); } // 检查 qvvr 目录下文件总大小,若超过 10M 则删除最老的一个文件 // 注意:该逻辑严格按照你需求“只删一个最老的文件”来实现,而不是循环删到小于10M static void checkAndRemoveOldestIfNeeded(const std::string& dirPath, long long maxBytes) { // 1) 判断目录是否存在,不存在则尝试创建 struct stat st; if (stat(dirPath.c_str(), &st) == -1) { if (errno == ENOENT) { // 目录不存在,尝试创建(只创建最后一级,不递归) if (mkdir(dirPath.c_str(), 0777) != 0) { std::cerr << "Failed to create directory: " << dirPath << std::endl; return; } } else { std::cerr << "stat error: " << strerror(errno) << std::endl; return; } } else if (!S_ISDIR(st.st_mode)) { std::cerr << dirPath << " exists but is not a directory." << std::endl; return; } // 2) 获取目录下所有文件信息 std::vector fileList; getDirectoryFilesInfo(dirPath, fileList); // 3) 计算总大小 long long totalSize = 0; for (const auto& file : fileList) { totalSize += file.fileSize; } // 4) 如果超过阈值,则删除最老的文件 if (totalSize > maxBytes && !fileList.empty()) { std::sort(fileList.begin(), fileList.end(), [](const FileInfo& a, const FileInfo& b) { return a.modTime < b.modTime; }); std::remove(fileList[0].fileName.c_str()); } } // 扫描目录下的离线文件,依次读取并发送;若发送成功则删除该文件,发送不成功则保留 static void scanAndResendOfflineFiles(const std::string& dirPath) { // 获取目录下所有文件信息 std::vector fileList; std::cout << "getDirectoryFilesInfo" << std::endl; getDirectoryFilesInfo(dirPath, fileList); std::cout << "send every file" << std::endl; for (const auto& file : fileList) { // 读取 JSON 文件内容 std::ifstream inFile(file.fileName.c_str()); if (!inFile) { DIY_ERRORLOG("process", "【ERROR】无法打开本地缓存的暂态事件"); std::cerr << "fail to open existing file: " << file.fileName << std::endl; continue; } std::string jsonContent((std::istreambuf_iterator(inFile)), std::istreambuf_iterator()); inFile.close(); std::cout << "send jsonContent: " << jsonContent << std::endl; // 尝试发送 std::string response; SendJsonAPI_web(WEB_EVENT, "", jsonContent, response); if (!response.empty()) { try { json j_r = json::parse(response); DIY_WARNLOG("process", "【WARN】前置重发暂态事件成功"); std::cout << "old file send success, remove it" << std::endl; std::remove(file.fileName.c_str()); } catch (...) { std::cout << "old file send fail (response parse failed)" << std::endl; DIY_WARNLOG("process", "【WARN】前置重发暂态事件失败"); handleCommentResponse(response); // 仍然处理文本响应 } } else { std::cout << "old file send fail (no response)" << std::endl; // 不删除文件,等待下次重发 } } } // 0->"A", 1->"B", 2->"C", 3->"AB", 4->"BC", 5->"CA", 6/其它->"ABC" inline std::string phase_to_text(int phase) { switch (phase) { case 0: return "A"; case 1: return "B"; case 2: return "C"; case 3: return "AB"; case 4: return "BC"; case 5: return "CA"; default: return "ABC"; // 包含 6 和其他任何值 } } int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, double mag, double dur, long long start_tm, int dis_kind,int phase, const std::string& wavepath) { if (dev_id.empty()) { std::cout << "qvvr send error ,dev_id is null" << std::endl; return 1; } // 构造 JSON 对象 json root; //找监测点id std::string mpid; get_monitor_id_by_dev_and_seq(dev_id, monitor_id, mpid); if(mpid.empty()) { std::cout << "qvvr send error ,monitorId is null" << std::endl; return 1; } root["monitorId"] = mpid; root["devId"] = dev_id; root["CpuNo"] = monitor_id; root["amplitude"] = mag; root["duration"] = dur; root["eventType"] = dis_kind; // 时间处理 time_t start_sec = start_tm / 1000; //毫秒级取秒 struct tm* time_info = localtime(&start_sec); char time_buf[32]; strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", time_info); std::ostringstream start_time_stream; start_time_stream << time_buf << "." << std::setfill('0') << std::setw(3) << (start_tm % 1000);//构造成年月日时分秒.毫秒 std::string start_time_str = start_time_stream.str(); root["startTime"] = start_time_str; root["wavePath"] = wavepath; root["phase"] = phase_to_text(phase); std::string json_string = root.dump(4); std::cout << json_string << std::endl; // 发送到暂态接口 std::string response; SendJsonAPI_web(WEB_EVENT, "", json_string, response); // ================ 暂态重发功能 ========================= if (!response.empty()) { try { json j_r = json::parse(response); // 有效响应,略过 } catch (...) { // 响应异常,保存 json DIY_ERRORLOG(mpid.c_str(), "【ERROR】暂态接口响应异常,无法上送装置%s监测点%s的暂态事件",dev_id, monitor_id); std::cout << "qvvr send fail ,store in local" << std::endl; std::string qvvrDir = FRONT_PATH + "/dat/qvvr/"; std::string fileName = qvvrDir + dev_id + "-" + std::to_string(monitor_id) + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; writeJsonToFile(fileName, json_string); checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024); } } else { // 无响应,保存 json DIY_ERRORLOG(mpid.c_str(), "【ERROR】暂态接口无响应,无法上送装置%s监测点%s的暂态事件",dev_id, monitor_id); std::cout << "qvvr send fail ,store in local" << std::endl; std::string qvvrDir = FRONT_PATH + "/dat/qvvr/"; std::string fileName = qvvrDir + dev_id + "-" + std::to_string(monitor_id) + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; writeJsonToFile(fileName, json_string); checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024); return 1; } // 离线重发机制 { std::string qvvrDir = FRONT_PATH + "/dat/qvvr/"; scanAndResendOfflineFiles(qvvrDir); } // 响应处理 std::cout << "current qvvr handle response" << std::endl; handleCommentResponse(response); return 0; } void qvvr_test() { transfer_json_qvvr_data("qvvrtest123", 6, 10.98, 1234, 1754566628692, 1,1, "testwavepath"); } /////////////////////////////////////////////////////////////////////////////////////////////////////////通用接口响应 void handleCommentResponse(const std::string& response) { try { auto json_data = json::parse(response); // 或 json::parse(response); std::string code = "not found"; std::string msg = "not found"; if (json_data.contains("code")) { if (json_data["code"].is_string()) { code = json_data["code"]; } else if (json_data["code"].is_number_integer() || json_data["code"].is_number()) { code = std::to_string(json_data["code"].get()); } } if (json_data.contains("msg")) { if (json_data["msg"].is_string()) { msg = json_data["msg"]; } } if (code != "not found") { std::cout << "Response Code: " << code << std::endl; std::cout << "Message: " << msg << std::endl; } else { std::cerr << "Error: Missing expected fields in JSON response." << std::endl; } } catch (const json::parse_error& e) { std::cerr << "Error parsing response: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Unexpected exception: " << e.what() << std::endl; } } /////////////////////////////////////////////////////////////////////////////////////////////////////找监测点id bool get_monitor_id_by_dev_and_seq(const std::string& terminal_id, unsigned short logical_seq, std::string& out_monitor_id) { std::lock_guard lk(ledgermtx); for (const auto& dev : terminal_devlist) { if (dev.terminal_id != terminal_id) continue; // 命中该装置后,仅遍历它的监测点 for (const auto& mon : dev.line) { try { // logical_device_seq 存在台账里是字符串,转成数值再比对 unsigned long v = std::stoul(mon.logical_device_seq); unsigned short seq = static_cast(v); if (seq == logical_seq) { out_monitor_id = mon.monitor_id; return true; } } catch (...) { // 非法数值(空/非数字等)直接跳过 continue; } } break; // 已找到对应装置,没必要再看其它装置 } return false; }