From afc079465efd90114f9d81ea32d936414e928efc Mon Sep 17 00:00:00 2001 From: lnk Date: Tue, 5 Aug 2025 20:00:20 +0800 Subject: [PATCH 1/3] add qvvr interface --- LFtid1056/cloudfront/code/cfg_parser.cpp | 157 +++++++++++++++++++---- LFtid1056/cloudfront/code/interface.cpp | 50 ++------ LFtid1056/cloudfront/code/interface.h | 14 +- 3 files changed, 151 insertions(+), 70 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index d3b9a56..9261a13 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -2703,7 +2703,26 @@ bool extract_timestamp_from_cfg_file(const std::string& cfg_path, long long& sta return start_tm > 0 && trig_tm > 0; } +bool compare_qvvr_and_file(const std::string& cfg_path, const std::vector& data_list) { + long long start_tm = 0; + long long trig_tm = 0; + // 提取 .cfg 文件中的时间戳 + if (!extract_timestamp_from_cfg_file(cfg_path, start_tm, trig_tm)) { + std::cerr << "Failed to extract timestamp from cfg file: " << cfg_path << "\n"; + return false; + } + + // 遍历所有暂态事件,查找与 trig_tm 匹配的 + for (const auto& data : data_list) { + long long diff = static_cast(data.QVVR_time) - trig_tm; + if (std::abs(diff) <= 1) { + return true; + } + } + + return false; +} ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////数据转换函数 // DataArrayItem to_json void to_json(nlohmann::json& j, const DataArrayItem& d) { @@ -2844,7 +2863,7 @@ bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vect qfile.is_download = false; qfile.is_pair = false; qfile.file_time_count = 0; - qfile.file_start =false; + qfile.used_status =true; // 添加到唯一的 qvvrevent monitor.qvvrevent.qvvrfile.push_back(std::move(qfile)); //记录暂态文件组 @@ -2861,72 +2880,158 @@ bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vect } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////下载成功通知 +//提取下载路径的文件名 +std::string extract_filename(const std::string& path) { + size_t pos = path.find_last_of("/\\"); + return (pos != std::string::npos) ? path.substr(pos + 1) : path; +} +//发送匹配的所有录波文件 +bool SendAllQvvrFiles(qvvr_file& qfile, std::string& out_wavepath) { + std::vector wavepaths; + std::string first_wavepath; + bool send_success = true; + + for (const auto& file_localpath : qfile.file_download) { + std::string file_cloudpath = "comtrade/" + file_localpath; + std::string wavepath_result; + + // 发送本地文件到远端,返回 wavepath + SOEFileWeb(const_cast(file_localpath), file_cloudpath, wavepath_result); + + // 如果失败,重发一次 + if (wavepath_result.empty()) { + std::cerr << "[SOEFileWeb] Warning: first send failed for file: " << file_localpath << ", retrying...\n"; + SOEFileWeb(const_cast(file_localpath), file_cloudpath, wavepath_result); + } + + if (wavepath_result.empty()) { + send_success = false; + std::cerr << "[SOEFileWeb] Failed: wavepath empty for file: " << file_localpath << "\n"; + break; + } + + if (wavepaths.empty()) { + first_wavepath = wavepath_result; + } else if (wavepath_result != first_wavepath) { + send_success = false; + std::cerr << "[SOEFileWeb] Mismatch wavepath: " << wavepath_result + << " vs " << first_wavepath << "\n"; + break; + } + + wavepaths.push_back(wavepath_result); + } + + // 检查数量是否一致 + if (!send_success || wavepaths.size() != qfile.file_download.size()) { + std::cerr << "[SOEFileWeb] Failed to send all qvvr files. " + << "Sent: " << wavepaths.size() + << ", Expected: " << qfile.file_download.size() << "\n"; + return false; + } + + out_wavepath = first_wavepath; // 返回统一的 wavepath + + std::cout << "[SOEFileWeb] Success: all files sent for qfile. Wavepath = " + << first_wavepath << "\n"; + return true; +} + +//文件下载结束接口 bool update_qvvr_file_download(const std::string& filename_with_mac, const std::string& terminal_id) { - // 去除 mac 路径前缀 - size_t pos = filename_with_mac.find_last_of("/\\"); - std::string filename = (pos != std::string::npos) ? filename_with_mac.substr(pos + 1) : filename_with_mac; + // 去除 mac 路径前缀,仅保留文件名 + std::string filename = extract_filename(filename_with_mac); // 提取逻辑序号(如 PQM1 → 1) size_t under_pos = filename.find('_'); if (under_pos == std::string::npos) return false; - std::string type_part = filename.substr(0, under_pos); //PQMonitor_PQM1 + std::string type_part = filename.substr(0, under_pos); // PQMonitor_PQM1 size_t num_start = type_part.find_last_not_of("0123456789"); if (num_start == std::string::npos || num_start + 1 >= type_part.size()) return false; std::string seq_str = type_part.substr(num_start + 1); - ushort logical_seq = static_cast(std::stoi(seq_str)); + ushort logical_seq = static_cast(std::stoi(seq_str)); // 逻辑序号 for (auto& dev : terminal_devlist) { if (dev.terminal_id != terminal_id) continue; for (auto& monitor : dev.line) { try { + // 将监测点台账中的 logical_device_seq 转换为数字进行匹配 ushort monitor_seq = static_cast(std::stoi(monitor.logical_device_seq)); if (monitor_seq != logical_seq) continue; } catch (...) { - continue; + continue; // logical_device_seq 非法,跳过 } // 匹配监测点下 qvvrfile 中的 file_name for (auto& qfile : monitor.qvvrevent.qvvrfile) { + // file_name 中是文件名,需与提取的 filename 比较 auto it = std::find(qfile.file_name.begin(), qfile.file_name.end(), filename); if (it != qfile.file_name.end()) { - // 添加到 file_download(去重) - if (std::find(qfile.file_download.begin(), qfile.file_download.end(), filename) == qfile.file_download.end()) { - qfile.file_download.push_back(filename); + // 添加到 file_download(记录完整路径,避免重复) + if (std::find(qfile.file_download.begin(), qfile.file_download.end(), filename_with_mac) == qfile.file_download.end()) { + qfile.file_download.push_back(filename_with_mac); } - qfile.file_time_count = 0; - qfile.file_start = true; //开始下载文件 + qfile.file_time_count = 0; // 文件下载开始后,计时归零 - // 检查 file_download 是否与 file_name 完全一致(集合相同)//每次下载都会对比 + // file_download 中是完整路径,需提取文件名后与 file_name 做集合比较 std::set s_name(qfile.file_name.begin(), qfile.file_name.end()); - std::set s_down(qfile.file_download.begin(), qfile.file_download.end()); + std::set s_down; + for (const auto& path : qfile.file_download) { + s_down.insert(extract_filename(path)); // 提取每个路径中的文件名 + } + + // 检查 file_download 是否与 file_name 完全一致(集合相同) if (s_name == s_down) { - qfile.is_download = true; //全部下载完成 + qfile.is_download = true; // 全部下载完成 // 找到其中的 .cfg 文件进行匹配 - for (const auto& f : qfile.file_download) { - if (f.size() >= 4 && f.substr(f.size() - 4) == ".cfg") { - if (compare_qvvr_and_file(f)) {//提取文件时标和监测点事件的时标匹配 - qfile.is_pair = true; - //发送所有文件 + for (const auto& fpath : qfile.file_download) { + std::string fname = extract_filename(fpath); + if (fname.size() >= 4 && fname.substr(fname.size() - 4) == ".cfg") { + // 提取文件时标和监测点事件的时标匹配 + if (compare_qvvr_and_file(fpath, monitor.qvvrevent.qvvrdata)) { + qfile.is_pair = true; // 文件与事件匹配成功 - //发送暂态事件 - + // 发送所有文件(已下载完成) + std::string wavepath; + if (SendAllQvvrFiles(qfile, wavepath)) { + //文件发送成功后更新事件 + transfer_json_qvvr_data(terminal_id, + logical_seq, + monitor.qvvrevent.qvvrdata.QVVR_Amg, + monitor.qvvrevent.qvvrdata.QVVR_PerTime, + monitor.qvvrevent.qvvrdata.QVVR_time, + monitor.qvvrevent.qvvrdata.QVVR_type, + monitor.qvvrevent.qvvrdata.phase, + wavepath); + + // 清除暂态数据 + monitor.qvvrevent.qvvrfile.clear(); + monitor.qvvrevent.qvvrdata.clear(); + + // 删除上传成功的文件 + for (const auto& uploaded_file : qfile.file_download) { + if (std::remove(uploaded_file.c_str()) != 0) { + std::cerr << "[Cleanup] Failed to delete file: " << uploaded_file << "\n"; + } else { + std::cout << "[Cleanup] Deleted uploaded file: " << uploaded_file << "\n"; + } + } + } } break; // 只处理第一个 cfg 文件 } } } - - return true; + return true; // 当前文件处理成功 } } } } - - return false; + return false; // 未匹配到终端ID或逻辑序号对应的监测点 } diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index edac700..5f35df8 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -1149,32 +1149,24 @@ static void scanAndResendOfflineFiles(const std::string& dirPath) } } -int transfer_json_qvvr_data(unsigned int func_type, int monitor_id, - double mag, double dur, long long start_tm, long long end_tm, int dis_kind, - const std::string& uuid_cfg, const std::string& uuid_dat, - const std::string& mp_id, const std::string& Qvvr_rptname, const std::string& devtype) { +int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, + double mag, double dur, long long start_tm, int dis_kind,int phase, + const std::string& wavepath) { // 监测点日志的 key, lnk20250526 - std::string full_key_m_c = "monitor." + mp_id + ".COM"; - std::string full_key_m_d = "monitor." + mp_id + ".DATA"; + std::string full_key_m_c = "monitor." + std::to_string(monitor_id) + ".COM"; + std::string full_key_m_d = "monitor." + std::to_string(monitor_id) + ".DATA"; // 监测点日志的 key, lnk20250526 - // 获取装置类型的映射配置 - XmlConfig c_xmlcfg; - if (xmlinfo_list.count(devtype)) { - c_xmlcfg = xmlinfo_list[devtype]->xmlcfg; - } else { - c_xmlcfg = xmlcfg; - } - - if (mp_id.empty()) { - std::cout << "mp_id is null" << std::endl; + if (dev_id.empty()) { + std::cout << "dev_id is null" << std::endl; return 0; } // 构造 JSON 对象 json root; - root["monitorId"] = mp_id; + root["devId"] = dev_id; + root["CpuNo"] = monitor_id; root["amplitude"] = mag; root["duration"] = dur; root["eventType"] = dis_kind; @@ -1189,21 +1181,9 @@ int transfer_json_qvvr_data(unsigned int func_type, int monitor_id, std::string start_time_str = start_time_stream.str(); root["startTime"] = start_time_str; - root["wavePath"] = uuid_dat; //接口提供了两个文件名入参,实际上名字一样只用一个,可优化 + root["wavePath"] = wavepath; + root["phase"] = phase; - if (c_xmlcfg.WavePhasicFlag == "1") { //映射配置分相 - if (Qvvr_rptname.find(c_xmlcfg.WavePhasicA) != std::string::npos) { - root["phase"] = "A"; - } else if (Qvvr_rptname.find(c_xmlcfg.WavePhasicB) != std::string::npos) { - root["phase"] = "B"; - } else if (Qvvr_rptname.find(c_xmlcfg.WavePhasicC) != std::string::npos) { - root["phase"] = "C"; - } else { - root["phase"] = "unknow"; - } - } else { - root["phase"] = "unknow"; //不分相 - } std::string json_string = root.dump(4); std::cout << json_string << std::endl; @@ -1212,7 +1192,7 @@ int transfer_json_qvvr_data(unsigned int func_type, int monitor_id, std::string response; SendJsonAPI_web(WEB_EVENT, "", json_string, response); - // ================ 插入新功能 ========================= + // ================ 暂态重发功能 ========================= if (!response.empty()) { try { json j_r = json::parse(response); @@ -1254,13 +1234,7 @@ int transfer_json_qvvr_data(unsigned int func_type, int monitor_id, void qvvr_test() { - char uuid_cfg[] = {"/comtrade/"}; - char uuid_dat[] = {"/comtrade/"}; - char mp_id[] = {"qvvrtest123"}; - char Qvvr_rptname[] = {"unknow"}; - char devtype[] = {"01"}; - transfer_json_qvvr_data(1, 123456789, 220, 180, 1730894400.123, 1730894580, 1210001,uuid_cfg,uuid_dat,mp_id,Qvvr_rptname,devtype); } /////////////////////////////////////////////////////////////////////////////////////////////////////////通用接口响应 diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index e172148..ef894c7 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -52,7 +52,7 @@ public: //录波文件和暂态事件 class qvvr_data { - int used_status; //是否占用 + bool used_status; //是否占用 int QVVR_type; //暂态类型 uint64_t QVVR_time; //暂态开始时间 unsigned longlong double QVVR_PerTime; //暂态持续时间 @@ -62,7 +62,7 @@ class qvvr_data class qvvr_file { - bool file_start; + bool used_status; int file_time_count; //组内文件下载时间计数(第一个文件下载后十分钟内如果其他文件没下载全或者下载全了没匹配事件则将已下载的文件都移到备份区comtrade_bak) bool is_download; //文件是否下载完全,最后一个文件下载成功后对比成功则更新这个标志 bool is_pair; //文件是否和事件匹配,从comtrade/mac/路径下取file_download中的cfg文件提取时间和持续时间来匹配,匹配后接口发送这组file_download全部文件,发送成功后删除这组文件,然后更新事件中的文件列表 @@ -454,14 +454,16 @@ std::string generate_json( //构造装置主动上送数据的报文 const std::vector& dataArray //数据数组。 ); -int transfer_json_qvvr_data(unsigned int func_type, int monitor_id, //暂态事件的接口 - double mag, double dur, long long start_tm, long long end_tm, int dis_kind, - const std::string& uuid_cfg, const std::string& uuid_dat, - const std::string& mp_id, const std::string& Qvvr_rptname, const std::string& devtype); +//暂态事件接口 +int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, + double mag, double dur, long long start_tm, int dis_kind,int phase, + const std::string& wavepath); //录波文件目录接口 +bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vector& file_list_raw); //录波文件下载完成通知接口 +bool update_qvvr_file_download(const std::string& filename_with_mac, const std::string& terminal_id); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// From 83986c35cbf99a26cc0f56f6807a88119b7d3e36 Mon Sep 17 00:00:00 2001 From: lnk Date: Thu, 7 Aug 2025 18:59:11 +0800 Subject: [PATCH 2/3] add qvvr interface --- LFtid1056/cloudfront/code/cfg_parser.cpp | 57 ++++++++++++++++++------ LFtid1056/cloudfront/code/interface.cpp | 12 ++--- LFtid1056/cloudfront/code/interface.h | 31 +++++++------ LFtid1056/dealMsg.cpp | 11 +++++ 4 files changed, 77 insertions(+), 34 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 9261a13..11e1eb6 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -2703,7 +2703,7 @@ bool extract_timestamp_from_cfg_file(const std::string& cfg_path, long long& sta return start_tm > 0 && trig_tm > 0; } -bool compare_qvvr_and_file(const std::string& cfg_path, const std::vector& data_list) { +bool compare_qvvr_and_file(const std::string& cfg_path, const std::vector& data_list,qvvr_data& matched_data) { long long start_tm = 0; long long trig_tm = 0; @@ -2717,6 +2717,7 @@ bool compare_qvvr_and_file(const std::string& cfg_path, const std::vector(data.QVVR_time) - trig_tm; if (std::abs(diff) <= 1) { + matched_data = data; // 返回匹配到的事件 return true; } } @@ -2940,6 +2941,10 @@ bool SendAllQvvrFiles(qvvr_file& qfile, std::string& out_wavepath) { //文件下载结束接口 bool update_qvvr_file_download(const std::string& filename_with_mac, const std::string& terminal_id) { + + //台账加锁 + std::lock_guard lock(ledgermtx); + // 去除 mac 路径前缀,仅保留文件名 std::string filename = extract_filename(filename_with_mac); @@ -2954,9 +2959,11 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: std::string seq_str = type_part.substr(num_start + 1); ushort logical_seq = static_cast(std::stoi(seq_str)); // 逻辑序号 + //找终端 for (auto& dev : terminal_devlist) { if (dev.terminal_id != terminal_id) continue; + //找监测点 for (auto& monitor : dev.line) { try { // 将监测点台账中的 logical_device_seq 转换为数字进行匹配 @@ -2967,16 +2974,19 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: } // 匹配监测点下 qvvrfile 中的 file_name - for (auto& qfile : monitor.qvvrevent.qvvrfile) { + for (size_t i = 0; i < monitor.qvvrevent.qvvrfile.size(); ++i) { + auto& qfile = monitor.qvvrevent.qvvrfile[i]; // file_name 中是文件名,需与提取的 filename 比较 auto it = std::find(qfile.file_name.begin(), qfile.file_name.end(), filename); + + //找到匹配文件名 if (it != qfile.file_name.end()) { // 添加到 file_download(记录完整路径,避免重复) if (std::find(qfile.file_download.begin(), qfile.file_download.end(), filename_with_mac) == qfile.file_download.end()) { qfile.file_download.push_back(filename_with_mac); } - qfile.file_time_count = 0; // 文件下载开始后,计时归零 + qfile.file_time_count = 0; // 任一录波文件下载后,计时归零 // file_download 中是完整路径,需提取文件名后与 file_name 做集合比较 std::set s_name(qfile.file_name.begin(), qfile.file_name.end()); @@ -2994,7 +3004,8 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: std::string fname = extract_filename(fpath); if (fname.size() >= 4 && fname.substr(fname.size() - 4) == ".cfg") { // 提取文件时标和监测点事件的时标匹配 - if (compare_qvvr_and_file(fpath, monitor.qvvrevent.qvvrdata)) { + qvvr_data matched; + if (compare_qvvr_and_file(fpath, monitor.qvvrevent.qvvrdata,matched)) { qfile.is_pair = true; // 文件与事件匹配成功 // 发送所有文件(已下载完成) @@ -3003,16 +3014,12 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: //文件发送成功后更新事件 transfer_json_qvvr_data(terminal_id, logical_seq, - monitor.qvvrevent.qvvrdata.QVVR_Amg, - monitor.qvvrevent.qvvrdata.QVVR_PerTime, - monitor.qvvrevent.qvvrdata.QVVR_time, - monitor.qvvrevent.qvvrdata.QVVR_type, - monitor.qvvrevent.qvvrdata.phase, + matched.QVVR_Amg, + matched.QVVR_PerTime, + matched.QVVR_time, + matched.QVVR_type, + matched.phase, wavepath); - - // 清除暂态数据 - monitor.qvvrevent.qvvrfile.clear(); - monitor.qvvrevent.qvvrdata.clear(); // 删除上传成功的文件 for (const auto& uploaded_file : qfile.file_download) { @@ -3022,15 +3029,37 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: std::cout << "[Cleanup] Deleted uploaded file: " << uploaded_file << "\n"; } } + + // 清除已发送的暂态文件 + monitor.qvvrevent.qvvrfile.erase(monitor.qvvrevent.qvvrfile.begin() + i); + + //清除暂态事件 + auto it = std::find_if( + monitor.qvvrevent.qvvrdata.begin(), + monitor.qvvrevent.qvvrdata.end(), + [&](const qvvr_data& d) { + return d.QVVR_time == matched.QVVR_time; + }); + + if (it != monitor.qvvrevent.qvvrdata.end()) { + monitor.qvvrevent.qvvrdata.erase(it); + } } } break; // 只处理第一个 cfg 文件 } } } + else{ + std::cout << "qvvr file still imcomplete!!!" << std::endl; + } + return true; // 当前文件处理成功 - } + } } + + std::cout << "file name doesnt match any file in this monitor!!!" << std::endl; + } } return false; // 未匹配到终端ID或逻辑序号对应的监测点 diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index 5f35df8..bb7f173 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -1154,8 +1154,8 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, const std::string& wavepath) { // 监测点日志的 key, lnk20250526 - std::string full_key_m_c = "monitor." + std::to_string(monitor_id) + ".COM"; - std::string full_key_m_d = "monitor." + std::to_string(monitor_id) + ".DATA"; + std::string full_key_m_c = "monitor." + dev_id + "." + std::to_string(monitor_id) + ".COM"; + std::string full_key_m_d = "monitor." + dev_id + "." + std::to_string(monitor_id) + ".DATA"; // 监测点日志的 key, lnk20250526 if (dev_id.empty()) { @@ -1199,21 +1199,21 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, // 有效响应,略过 } catch (...) { // 响应异常,保存 json - DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口响应异常,无法上送监测点%s的暂态事件", mp_id.c_str()); + DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口响应异常,无法上送监测点%s的暂态事件", monitor_id.c_str()); std::cout << "qvvr send fail ,store in local" << std::endl; std::string qvvrDir = FRONT_PATH + "/dat/qvvr/"; - std::string fileName = qvvrDir + mp_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; + std::string fileName = qvvrDir + monitor_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; writeJsonToFile(fileName, json_string); checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024); } } else { // 无响应,保存 json - DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口无响应,无法上送监测点%s的暂态事件", mp_id.c_str()); + DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口无响应,无法上送监测点%s的暂态事件", monitor_id.c_str()); std::cout << "qvvr send fail ,store in local" << std::endl; std::string qvvrDir = FRONT_PATH + "/dat/qvvr/"; - std::string fileName = qvvrDir + mp_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; + std::string fileName = qvvrDir + monitor_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; writeJsonToFile(fileName, json_string); checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024); return 1; diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index ef894c7..14e9889 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -52,28 +52,31 @@ public: //录波文件和暂态事件 class qvvr_data { - bool used_status; //是否占用 - int QVVR_type; //暂态类型 - uint64_t QVVR_time; //暂态开始时间 unsigned longlong - double QVVR_PerTime; //暂态持续时间 - double QVVR_Amg; //暂态幅值 - int phase; //相别(仅瞬态上送)0-A 1-B 2-C 3-AB 4-BC 5-CA 其他-ABC/异常 + public: + bool used_status; //是否占用 + int QVVR_type; //暂态类型 + uint64_t QVVR_time; //暂态开始时间 unsigned longlong + double QVVR_PerTime; //暂态持续时间 + double QVVR_Amg; //暂态幅值 + int phase; //相别(仅瞬态上送)0-A 1-B 2-C 3-AB 4-BC 5-CA 其他-ABC/异常 }; class qvvr_file { - bool used_status; - int file_time_count; //组内文件下载时间计数(第一个文件下载后十分钟内如果其他文件没下载全或者下载全了没匹配事件则将已下载的文件都移到备份区comtrade_bak) - bool is_download; //文件是否下载完全,最后一个文件下载成功后对比成功则更新这个标志 - bool is_pair; //文件是否和事件匹配,从comtrade/mac/路径下取file_download中的cfg文件提取时间和持续时间来匹配,匹配后接口发送这组file_download全部文件,发送成功后删除这组文件,然后更新事件中的文件列表 - std::list file_name; //文件列表(文件列表上送后就记录) - std::list file_download; //文件已下载列表(每次列表上送会有多个文件,多个文件都下载完全则开始匹配,每次更新都去重并对比file_name) + public: + bool used_status; + int file_time_count; //组内文件下载时间计数(第一个文件下载后十分钟内如果其他文件没下载全或者下载全了没匹配事件则将已下载的文件都移到备份区comtrade_bak) + bool is_download; //文件是否下载完全,最后一个文件下载成功后对比成功则更新这个标志 + bool is_pair; //文件是否和事件匹配,从comtrade/mac/路径下取file_download中的cfg文件提取时间和持续时间来匹配,匹配后接口发送这组file_download全部文件,发送成功后删除这组文件,然后更新事件中的文件列表 + std::list file_name; //文件列表(文件列表上送后就记录) + std::list file_download; //文件已下载列表(每次列表上送会有多个文件,多个文件都下载完全则开始匹配,每次更新都去重并对比file_name) }; class qvvr_event { - std::vector qvvrdata; //暂态事件列表 - std::vector qvvrfile; //暂态文件组列表 + public: + std::vector qvvrdata; //暂态事件列表 + std::vector qvvrfile; //暂态文件组列表 }; //监测点台账 diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 058cd82..a1a8612 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -120,6 +120,11 @@ void process_received_message(string mac, string id,const char* data, size_t len << ", ֵ: " << record.fMagntitude << " pu" << ", ʱ: " << record.triggerTimeMs << "ms" << std::endl; + //lnk20250805 ¼ȼ¼¼ļϴٸļ + transfer_json_qvvr_data(id,event.head.name, + record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase, + ""); + } else { // ȡʧܵ @@ -197,6 +202,9 @@ void process_received_message(string mac, string id,const char* data, size_t len } std::cout << std::endl; + //lnk20250805¼ļĿ¼ӿ + assign_qvvr_file_list(id, line_id, fullFilenames); + // ========== Ϊÿļ ========== for (const auto& filename : fullFilenames) { // (֡Ź̶Ϊ1ʼļ) @@ -644,6 +652,9 @@ void process_received_message(string mac, string id,const char* data, size_t len out_file.write(reinterpret_cast(file_data.data()), file_data.size()); std::cout << "File saved: " << file_path << std::endl; + + //lnk20250805ļɹ¼ļͽӿ + update_qvvr_file_download(file_path, id); } else { std::cerr << "Failed to save file: " << file_path From 16ccb567d0170632bba0d6eed922654831e591b3 Mon Sep 17 00:00:00 2001 From: lnk Date: Fri, 8 Aug 2025 11:16:38 +0800 Subject: [PATCH 3/3] runing with ledger interface --- LFtid1056/PQSMsg.cpp | 69 ++++++++++++++++++- LFtid1056/cloudfront/code/cfg_parser.cpp | 28 ++++++-- LFtid1056/cloudfront/code/interface.cpp | 50 +++++++------- LFtid1056/cloudfront/code/interface.h | 7 ++ LFtid1056/cloudfront/code/log4.cpp | 12 ++-- LFtid1056/cloudfront/code/main.cpp | 30 ++++++-- LFtid1056/dealMsg.cpp | 24 ++++--- LFtid1056/main_thread.cpp | 87 ++++++++++++++++++++---- 8 files changed, 236 insertions(+), 71 deletions(-) diff --git a/LFtid1056/PQSMsg.cpp b/LFtid1056/PQSMsg.cpp index 51e94ba..ebf0ce4 100644 --- a/LFtid1056/PQSMsg.cpp +++ b/LFtid1056/PQSMsg.cpp @@ -7,6 +7,8 @@ #include #include #include "client2.h" +#include + // ת float IntToFloat(int num) { return static_cast(num) / 65536.0f; @@ -32,7 +34,62 @@ void ReversalBuff(uint8_t* buff, int start, int length) { } // MACַ䵽 -void GetMAC(const std::string& strMAC, std::vector& packet, size_t startIndex) { +//lnk20250808 +static inline int hex_nibble(char c) { + if (c >= '0' && c <= '9') return c - '0'; + c = static_cast(std::tolower(static_cast(c))); + if (c >= 'a' && c <= 'f') return 10 + (c - 'a'); + return -1; +} + +// ȫ棺쳣 true/false +bool GetMAC(const std::string& strMAC, + std::vector& packet, + size_t startIndex, + std::string* err = nullptr) { + // 1) 淶ȥ '-', ':', ' ' + std::string s; + s.reserve(strMAC.size()); + for (char c : strMAC) { + if (c == '-' || c == ':' || c == ' ') continue; + s.push_back(c); + } + + // 2) У + if (s.size() != 12) { + if (err) *err = "MACȱΪ12ʮַ"; + return false; + } + + // 3) ʮУ + + std::array mac{}; + for (int i = 0; i < 6; ++i) { + int hi = hex_nibble(s[2*i]); + int lo = hex_nibble(s[2*i + 1]); + if (hi < 0 || lo < 0) { + if (err) *err = "MACǷַ(0-9A-F)"; + return false; + } + mac[i] = static_cast((hi << 4) | lo); + } + + // 4) ȷ㹻д64ֽڣMAC 6ֽ + 㣩 + const size_t need = startIndex + 64; + if (packet.size() < need) packet.resize(need, 0); + + // 5) дMAC + for (int i = 0; i < 6; ++i) { + packet[startIndex + i] = mac[i]; + } + + // 6) λò㣨 resize 0֤һ£ + for (size_t i = 6; i < 64; ++i) { + packet[startIndex + i] = 0; + } + + return true; +} +/*void GetMAC(const std::string& strMAC, std::vector& packet, size_t startIndex) { // ƳпոͶ̺ std::string cleanedMAC = strMAC; cleanedMAC.erase(std::remove(cleanedMAC.begin(), cleanedMAC.end(), ' '), cleanedMAC.end()); @@ -70,7 +127,7 @@ void GetMAC(const std::string& strMAC, std::vector& packet, size_ catch (const std::exception& e) { throw std::invalid_argument("ЧMACַ: " + std::string(e.what())); } -} +}*/ // CRC㺯 unsigned char GetCrcSum(const std::vector& Check, int nOffset, int nLen) { @@ -292,7 +349,13 @@ std::vector generate_frontlogin_message(const std::string& strMac // [16-19] 2 (Ϊ0) // MACַ (λ20ʼ64ֽ) - GetMAC(strMac, packet, 20); + //GetMAC(strMac, packet, 20); + //lnk20250808 + std::string err; + if (!GetMAC(strMac, packet, 0, &err)) { + std::cerr << "[GetMAC] parse failed: " << err << "\n"; + // 򷵻 + } // У (ƫ8137) unsigned char checksum = 0; diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 11e1eb6..3ef7ae6 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -2750,6 +2750,7 @@ void to_json(nlohmann::json& j, const MsgObj& m) { // FullObj to_json void to_json(nlohmann::json& j, const FullObj& f) { j = nlohmann::json{ + {"Id", f.mac}, {"Mid", f.Mid}, {"Did", f.Did}, {"Pri", f.Pri}, @@ -2758,6 +2759,7 @@ void to_json(nlohmann::json& j, const FullObj& f) { }; } std::string generate_json( + const std::string mac, int Mid, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” int Did, //设备唯一标识Ldid,填入0代表Ndid。 int Pri, //报文处理的优先级 @@ -2769,6 +2771,7 @@ std::string generate_json( const std::vector& dataArray //数据数组。 ) { FullObj fobj; + fobj.mac = mac; fobj.Mid = Mid; fobj.Did = Did; fobj.Pri = Pri; @@ -2792,7 +2795,7 @@ void upload_data_test(){ arr.push_back({2, 1691741340, 0, 1, "yyyy"}); std::string js = generate_json( - -1, 2, 1, 4866, 1, 0, 2, 1, arr + "123",-1, 1, 1, 4866, 1, 0, 2, 1, arr ); std::cout << js << std::endl; @@ -2814,8 +2817,9 @@ std::vector GenerateDeviceInfoFromLedger(const std::vector GenerateDeviceInfoFromLedger(const std::vector lock(ledgermtx); // 去除 mac 路径前缀,仅保留文件名 - std::string filename = extract_filename(filename_with_mac); + std::string filename = extract_filename1(filename_with_mac); // 提取逻辑序号(如 PQM1 → 1) size_t under_pos = filename.find('_'); @@ -2992,7 +2999,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: std::set s_name(qfile.file_name.begin(), qfile.file_name.end()); std::set s_down; for (const auto& path : qfile.file_download) { - s_down.insert(extract_filename(path)); // 提取每个路径中的文件名 + s_down.insert(extract_filename1(path)); // 提取每个路径中的文件名 } // 检查 file_download 是否与 file_name 完全一致(集合相同) @@ -3001,7 +3008,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: // 找到其中的 .cfg 文件进行匹配 for (const auto& fpath : qfile.file_download) { - std::string fname = extract_filename(fpath); + std::string fname = extract_filename1(fpath); if (fname.size() >= 4 && fname.substr(fname.size() - 4) == ".cfg") { // 提取文件时标和监测点事件的时标匹配 qvvr_data matched; @@ -3064,3 +3071,10 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: } return false; // 未匹配到终端ID或逻辑序号对应的监测点 } + +////////////////////////////////////////////////////////////////////////////////////////提取mac +std::string normalize_mac(const std::string& mac) { + std::string result = mac; + result.erase(std::remove(result.begin(), result.end(), '-'), result.end()); + return result; +} \ No newline at end of file diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index bb7f173..f0a8117 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -609,27 +609,27 @@ int terminal_ledger_web(std::map& terminal_dev_map, dev.terminal_id = safe_str(item, "id"); dev.addr_str = safe_str(item, "ip"); dev.terminal_name = safe_str(item, "name"); - dev.org_name = safe_str(item, "org_name"); - dev.maint_name = safe_str(item, "maint_name"); - dev.station_name = safe_str(item, "stationName"); - dev.tmnl_factory = safe_str(item, "manufacturer"); - dev.tmnl_status = safe_str(item, "status"); + //dev.org_name = safe_str(item, "org_name"); + //dev.maint_name = safe_str(item, "maint_name"); + //dev.station_name = safe_str(item, "stationName"); + //dev.tmnl_factory = safe_str(item, "manufacturer"); + //dev.tmnl_status = safe_str(item, "status"); dev.dev_type = safe_str(item, "devType"); - dev.dev_key = safe_str(item, "devKey"); - dev.dev_series = safe_str(item, "series"); - dev.port = safe_str(item, "port"); - dev.timestamp = safe_str(item, "updateTime"); - dev.processNo = safe_str(item, "processNo"); - dev.maxProcessNum = safe_str(item, "maxProcessNum"); + //dev.dev_key = safe_str(item, "devKey"); + //dev.dev_series = safe_str(item, "series"); + //dev.port = safe_str(item, "port"); + //dev.timestamp = safe_str(item, "updateTime"); + dev.processNo = safe_str(item, "node"); + //dev.maxProcessNum = safe_str(item, "maxProcessNum"); - dev.mac = safe_str(item, "mac");//添加mac + //dev.mac = safe_str(item, "mac");//添加mac if (item.contains("monitorData") && item["monitorData"].is_array()) { for (auto& mon : item["monitorData"]) { if (dev.line.size() >= 10) break; ledger_monitor m; m.monitor_id = safe_str(mon, "id"); - m.terminal_id = safe_str(mon, "terminal_id"); + m.terminal_id = safe_str(mon, "deviceId"); m.monitor_name = safe_str(mon, "name"); m.logical_device_seq = safe_str(mon, "lineNo"); m.voltage_level = safe_str(mon, "voltageLevel"); @@ -637,10 +637,10 @@ int terminal_ledger_web(std::map& terminal_dev_map, m.timestamp = safe_str(mon, "updateTime"); m.status = safe_str(mon, "status"); - m.CT1 = mon.value("CT1", 0.0); - m.CT2 = mon.value("CT2", 0.0); - m.PT1 = mon.value("PT1", 0.0); - m.PT2 = mon.value("PT2", 0.0); + m.CT1 = mon.value("ct1", 0.0); + m.CT2 = mon.value("ct2", 0.0); + m.PT1 = mon.value("pt1", 0.0); + m.PT2 = mon.value("pt2", 0.0); dev.line.push_back(m); } @@ -736,9 +736,9 @@ int parse_device_cfg_web() if (IED_COUNT < count_cfg) { std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl; - DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT); + //DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT); } else { - DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT); + //DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT); } ///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map,后续根据协议台账修改 @@ -1199,21 +1199,21 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, // 有效响应,略过 } catch (...) { // 响应异常,保存 json - DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口响应异常,无法上送监测点%s的暂态事件", monitor_id.c_str()); + DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口响应异常,无法上送装置%s监测点%s的暂态事件",dev_id, monitor_id); std::cout << "qvvr send fail ,store in local" << std::endl; std::string qvvrDir = FRONT_PATH + "/dat/qvvr/"; - std::string fileName = qvvrDir + monitor_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; + std::string fileName = qvvrDir + dev_id + "-" + std::to_string(monitor_id) + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; writeJsonToFile(fileName, json_string); checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024); } } else { // 无响应,保存 json - DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口无响应,无法上送监测点%s的暂态事件", monitor_id.c_str()); + DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口无响应,无法上送装置%s监测点%s的暂态事件",dev_id, monitor_id); std::cout << "qvvr send fail ,store in local" << std::endl; std::string qvvrDir = FRONT_PATH + "/dat/qvvr/"; - std::string fileName = qvvrDir + monitor_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; + std::string fileName = qvvrDir + dev_id + "-" + std::to_string(monitor_id) + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt"; writeJsonToFile(fileName, json_string); checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024); return 1; @@ -1234,7 +1234,9 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, void qvvr_test() { - + transfer_json_qvvr_data("qvvrtest123", 6, + 10.98, 1234, 1754566628692, 1,1, + "testwavepath"); } /////////////////////////////////////////////////////////////////////////////////////////////////////////通用接口响应 diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 14e9889..3f4008a 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -429,6 +429,7 @@ struct MsgObj { // 整体 struct FullObj { + std::string mac; int Mid; int Did; int Pri; @@ -446,6 +447,7 @@ void to_json(nlohmann::json& j, const FullObj& f); std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist);//接口读取台账后,再调用这个将台账拷贝过来 std::string generate_json( //构造装置主动上送数据的报文 + const std::string mac, int Mid, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” int Did, //设备唯一标识Ldid,填入0代表Ndid。 int Pri, //报文处理的优先级 @@ -461,6 +463,8 @@ std::string generate_json( //构造装置主动上送数据的报文 int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id, double mag, double dur, long long start_tm, int dis_kind,int phase, const std::string& wavepath); +//录波文件上传接口 +void SOEFileWeb(std::string& localpath,std::string& cloudpath, std::string& wavepath); //录波文件目录接口 bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vector& file_list_raw); @@ -468,6 +472,9 @@ bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vect //录波文件下载完成通知接口 bool update_qvvr_file_download(const std::string& filename_with_mac, const std::string& terminal_id); +//提取mac +std::string normalize_mac(const std::string& mac); + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #endif diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index 0d4ed55..544e14d 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -334,8 +334,8 @@ void init_loggers_bydevid(const std::string& dev_id) const ledger_monitor& monitor = term.line[j]; if (!monitor.monitor_id.empty()) { std::ostringstream mon_key_c, mon_key_d, mon_path, mon_name; - mon_key_c << "monitor." << monitor.monitor_id << ".COM"; - mon_key_d << "monitor." << monitor.monitor_id << ".DATA"; + mon_key_c << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".COM"; + mon_key_d << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".DATA"; mon_path << device_dir << "/monitor" << j; mon_name << monitor.monitor_id; @@ -355,7 +355,7 @@ void init_loggers_bydevid(const std::string& dev_id) logger_map[mon_key_c.str()] = TypedLogger(mon_logger_c, LOGTYPE_COM); logger_map[mon_key_d.str()] = TypedLogger(mon_logger_d, LOGTYPE_DATA); - DIY_WARNLOG(mon_key_d.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.monitor_id.c_str()); + DIY_WARNLOG(mon_key_d.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str()); } } } @@ -406,8 +406,8 @@ void init_loggers() if (!monitor.monitor_id.empty()) { std::ostringstream mon_key_c, mon_key_d, mon_path, mon_name; - mon_key_c << "monitor." << monitor.monitor_id << ".COM"; - mon_key_d << "monitor." << monitor.monitor_id << ".DATA"; + mon_key_c << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".COM"; + mon_key_d << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".DATA"; mon_path << device_dir << "/monitor" << i; // 用monitor+序号作为目录 mon_name << monitor.monitor_id; @@ -425,7 +425,7 @@ void init_loggers() logger_map[mon_key_d.str()] = TypedLogger(mon_logger_d, LOGTYPE_DATA); DIY_WARNLOG(mon_key_d.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", - monitor.monitor_name.c_str(), monitor.monitor_id.c_str()); + monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str()); } } } diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index 074555a..c5f9599 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -215,7 +215,7 @@ std::string get_parent_directory() { init_loggers(); //读取模型,下载模板文件 - parse_model_cfg_web(); + //parse_model_cfg_web(); //解析模板文件 //Set_xml_nodeinfo(); @@ -518,16 +518,35 @@ void Front::mqproducerThread() extern thread_info_t thread_info[THREAD_CONNECTIONS]; +void cleanup_args(ThreadArgs* args) { + for (int i = 0; i < args->argc; ++i) { + free(args->argv[i]); // strdup 分配的 + } + delete[] args->argv; + delete args; +} + void* cloudfrontthread(void* arg) { /////////////////////////////////////// ThreadArgs* args = static_cast(arg); int argc = args->argc; char **argv = args->argv; - printf("argc = %d, argv[0] = %s\n", argc, argv[0]); + printf("[cloudfrontthread] argc = %d\n", argc); + for (int i = 0; i < argc; ++i) { + printf(" argv[%d] = %s\n", i, argv[i]); + } - //添加线程处理 - int index = *(int*)argv[0]; + // 动态解析线程 index + int index = 0; + if (argc > 0 && argv[0]) { + try { + index = std::stoi(argv[0]); + } catch (...) { + std::cerr << "[cloudfrontthread] Failed to parse index from argv[0]: " << argv[0] << "\n"; + return nullptr; + } + } // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); @@ -539,11 +558,12 @@ void* cloudfrontthread(void* arg) { // 解析命令行参数 if(!parse_param(argc,argv)){ std::cerr << "process param error,exit" << std::endl; + cleanup_args(args); return nullptr; } // 线程使用完后清理参数 - delete args; + cleanup_args(args); //路径获取 FRONT_PATH = get_parent_directory(); diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index a1a8612..10f3599 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -354,22 +354,23 @@ void process_received_message(string mac, string id,const char* data, size_t len std::vector arr; arr.push_back({1, // -1-ޣ 0-Rt,1-Max,2-Min,3-Avg,4-Cp95 data_time, //תʱ䣬ʱ꣬1970룬Ч롰-1 - -1, //ʱ꣬΢ӣЧ롰-1 + 0, //ʱ꣬΢ӣЧ롰-1 0, //ݱʶ1-ʶ쳣 max_base64Str}); - arr.push_back({2, data_time, -1, 0, min_base64Str}); - arr.push_back({3, data_time, -1, 0, avg_base64Str}); - arr.push_back({4, data_time, -1, 0, cp95_base64Str}); + arr.push_back({2, data_time, 0, 0, min_base64Str}); + arr.push_back({3, data_time, 0, 0, avg_base64Str}); + arr.push_back({4, data_time, 0, 0, cp95_base64Str}); std::string js = generate_json( + normalize_mac(mac), -1, //ӦıĶյԴIDӦӦ롰-1 - 123456, //豸ΨһʶLdid0Ndid,id - 3, //Ĵȼ1 I/Ӧ 2 II/Ӧ 3 ͨ/Ӧ 4 㲥 + 1, //豸ΨһʶLdid0Ndid,id + 1, //Ĵȼ1 I/Ӧ 2 II/Ӧ 3 ͨ/Ӧ 4 㲥 0x1302, //豸͵ avg_data.name, //߼豸ID0-߼豸-1 0x04, //͹̶Ϊ 2, //ԣޡ0ʵʱ1ͳơ2 - -1, //ݼţݼʽͣ-1 + 1, //ݼţݼʽͣ-1 arr // ); @@ -543,18 +544,19 @@ void process_received_message(string mac, string id,const char* data, size_t len std::vector arr; arr.push_back({1, // -1-ޣ 0-Rt,1-Max,2-Min,3-Avg,4-Cp95 data_time, //תʱ䣬ʱ꣬1970룬Ч롰-1 - -1, //ʱ꣬΢ӣЧ롰-1 + 0, //ʱ꣬΢ӣЧ롰-1 0, //ݱʶ1-ʶ쳣 base64}); std::string js = generate_json( + normalize_mac(mac), -1, //ӦıĶյԴIDӦӦ롰-1 - 123456, //豸ΨһʶLdid0Ndid,id - 3, //Ĵȼ1 I/Ӧ 2 II/Ӧ 3 ͨ/Ӧ 4 㲥 + 1, //豸ΨһʶLdid0Ndid,id + 1, //Ĵȼ1 I/Ӧ 2 II/Ӧ 3 ͨ/Ӧ 4 㲥 0x1302, //豸͵ cid, //߼豸ID0-߼豸-1 0x04, //͹̶Ϊ 1, //ԣޡ0ʵʱ1ͳơ2 - -1, //ݼţݼʽͣ-1 + 1, //ݼţݼʽͣ-1 arr // ); //std::cout << js << std::en diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index f3c17c1..2ca393c 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -8,6 +8,8 @@ #include "cloudfront/code/interface.h" #include +#include +#include using namespace std; #if 0 @@ -32,6 +34,9 @@ typedef struct { } thread_info_t; #endif +extern int INITFLAG;//̨˵ȳʼɱ־ +extern void cleanup_args(ThreadArgs* args); + void init_daemon(void) { int pid; @@ -108,6 +113,32 @@ std::vector generate_test_devices(int count) { return devices; } +void PrintDevices(const std::vector& devices) { + std::cout << "==== Devices List (" << devices.size() << ") ====\n"; + for (const auto& dev : devices) { + std::cout << "Device ID : " << dev.device_id << "\n"; + std::cout << "Name : " << dev.name << "\n"; + std::cout << "Model : " << dev.model << "\n"; + std::cout << "MAC : " << dev.mac << "\n"; + std::cout << "Status : " << dev.status << "\n"; + std::cout << "Points (" << dev.points.size() << "):\n"; + for (const auto& pt : dev.points) { + std::cout << " Point ID : " << pt.point_id << "\n"; + std::cout << " Name : " << pt.name << "\n"; + std::cout << " Device ID : " << pt.device_id << "\n"; + std::cout << " Cpu No : " << pt.nCpuNo << "\n"; + std::cout << " PT1 : " << pt.PT1 << "\n"; + std::cout << " PT2 : " << pt.PT2 << "\n"; + std::cout << " CT1 : " << pt.CT1 << "\n"; + std::cout << " CT2 : " << pt.CT2 << "\n"; + std::cout << " Scale : " << pt.strScale << "\n"; + std::cout << " PTType : " << pt.nPTType << "\n"; + std::cout << "----------------------\n"; + } + std::cout << "==========================\n"; + } +} + /* ̹߳ 0߳*/ /* ͻӹ̺߳*/ void* client_manager_thread(void* arg) { @@ -123,17 +154,17 @@ void* client_manager_thread(void* arg) { printf("Started client connections\n"); // - std::vector points1 = { + /*std::vector points1 = { {"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1,"0.38k",0}, {"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1,"0.38k",0} }; std::vector points2 = { {"P101", "Generator Output", "D002",1 ,1, 1, 1, 1,"0.38k",0} - }; + };*/ //00B78DA800D6 00-B7-8D-01-79-06 // װб - std::vector devices = { + /*std::vector devices = { { "D001", "Primary Device", "Model-X", "00-B7-8D-01-79-06", 1, points1 @@ -142,12 +173,16 @@ void* client_manager_thread(void* arg) { "D002", "Backup Device", "Model-Y", "00-B7-8D-A8-00-D6", 1, points2 } - }; + };*/ // 100װ - std::vector test_devices = generate_test_devices(100); + //std::vector test_devices = generate_test_devices(100); - //std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk + //lnk̨˶ȡ豸 + std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk + + //̨˴ӡ + PrintDevices(devices); // ͻ start_client_connect(devices); @@ -215,7 +250,7 @@ void restart_thread(int index) { int* new_index = (int*)malloc(sizeof(int)); *new_index = index; - if (index == 0) { + if (index == 1) { // ͻ˹߳ if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) { pthread_mutex_lock(&global_lock); @@ -225,7 +260,7 @@ void restart_thread(int index) { free(new_index); } } - else if (index == 1) { + else if (index == 2) { // Ϣ߳ if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) { pthread_mutex_lock(&global_lock); @@ -235,7 +270,7 @@ void restart_thread(int index) { free(new_index); } } - else if (index == 2) { + else if (index == 0) { // ӿڣmq char* argv[] = { (char*)new_index };//Ҫ̺Ų ThreadArgs* args = new ThreadArgs{1, argv}; @@ -259,6 +294,16 @@ int is_thread_alive(pthread_t tid) { return pthread_tryjoin_np(tid, NULL) == EBUSY; // EBUSYʾ߳ } +//lnk +ThreadArgs* make_thread_args_from_strs(const std::vector& args_vec) { + char** argv = new char*[args_vec.size() + 1]; // һ nullptr β + for (size_t i = 0; i < args_vec.size(); ++i) { + argv[i] = strdup(args_vec[i].c_str()); // strdup malloc + } + argv[args_vec.size()] = nullptr; + return new ThreadArgs{static_cast(args_vec.size()), argv}; +} + /* */ int main(int argc ,char** argv) {//Ӳ if(!parse_param(argc,argv)){ @@ -275,34 +320,46 @@ int main(int argc ,char** argv) {// pthread_mutex_init(&thread_info[i].lock, NULL); // ʼÿ̵߳ } + //ӿںmq + ThreadArgs* args = make_thread_args_from_strs({ "0" }); + if (pthread_create(&thread_info[0].tid, NULL, cloudfrontthread, args) != 0) { + printf("Failed to create message processor thread 0\n"); + cleanup_args(args); + } + + while(!INITFLAG){ + std::this_thread::sleep_for(std::chrono::seconds(3)); + std::cout << "waiting cloudfront initialize ..." << std::endl; + } + // ʼ߳ - for (int i = 0; i < THREAD_CONNECTIONS; i++) { + for (int i = 1; i < THREAD_CONNECTIONS; i++) { int* index = (int*)malloc(sizeof(int)); *index = i; - if (i == 0) { + if (i == 1) { // ͻ˹߳ if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) { printf("Failed to create client manager thread %d\n", i); free(index); } } - else if (i == 1) { + else if (i == 2) { // Ϣ߳ if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) { printf("Failed to create message processor thread %d\n", i); free(index); } } - else if (i == 2){ - //ӿںmq + else if (i == 3){ + /*//ӿںmq char* argv[] = { (char*)index };//Ҫ̺Ų ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { printf("Failed to create message processor thread %d\n", i); delete args; // ߳ûɹֶͷ free(index); - } + }*/ } else { // ߳