From 671fc6702e791db365948d4280c10043bc32771c Mon Sep 17 00:00:00 2001 From: lnk Date: Thu, 28 May 2026 14:42:32 +0800 Subject: [PATCH] recall stat --- LFtid1056/cloudfront/code/cfg_parser.cpp | 593 ++++++++++++++++------- LFtid1056/cloudfront/code/interface.h | 51 +- LFtid1056/cloudfront/code/main.cpp | 22 +- LFtid1056/cloudfront/code/worker.cpp | 26 +- LFtid1056/pqdif_thread_processor.cpp | 149 ++++++ 5 files changed, 636 insertions(+), 205 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 606058f..ea090f3 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -657,6 +659,9 @@ void init_config() { MULTIPLE_NODE_FLAG = 0; std::cout << "this is single process" << std::endl; } + else if(g_front_seg_num > 0 && g_front_seg_index == 0){ + std::cout << "this is pqdif process" << std::endl; + } else{ DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出"); exit(-1039); @@ -1406,7 +1411,7 @@ int recall_json_handle_from_mq(const std::string& body) // 根据 monitorId 和提取的数字初始化补招记录 init_recall_record_file(guid, terminalId, monId, "", ""); - //根据时间戳单独补招事件 + //根据时间戳单独补招事件,暂不使用 if(0)// ★新增(dt==2 同步生成“按小时”的事件补招到 recall_list,与 dt==1 逻辑一致)——开始 { std::lock_guard lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度 @@ -1490,34 +1495,43 @@ int recall_json_handle_from_mq(const std::string& body) } if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; } + // 稳态文件一个 guid + monitor 只生成一条 RecallFile + RecallFile rm_all; + bool has_steady_range = false; + + if (stat == 1) { + rm_all.recall_guid = guid; + rm_all.recall_status = 0; + rm_all.STEADY = std::to_string(stat); + rm_all.VOLTAGE = std::to_string(voltage); + rm_all.file_type = RecallFileType::STEADY_FILE; + } + for (const auto& ti : rec["timeInterval"]) { if (!ti.is_string()) continue; + std::string s = ti.get(); + std::string::size_type pos = s.find('~'); if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; } + std::string start = s.substr(0, pos); std::string end = s.substr(pos + 1); - RecallFile rm_all; - rm_all.recall_guid = guid; - rm_all.recall_status = 0; - rm_all.StartTime = start; - rm_all.EndTime = end; - rm_all.STEADY = std::to_string(stat); - rm_all.VOLTAGE = std::to_string(voltage); + //lnk 20251027xuyang request:生成文件记录单个测点单个时间段的补招记录文件,补招结束后使用这个文件信息来响应 init_recall_record_file(guid, terminalId, monId, start, end); if (voltage == 1) { std::vector recallinfo_list_hour; - Get_Recall_Time_Char(start, end, recallinfo_list_hour); + Get_Recall_Time_Char(start, end, recallinfo_list_hour); //暂态将整个时间段划分为多个一小时的时间段 for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) { const RecallInfo& info = recallinfo_list_hour[i]; RecallMonitor rm; rm.recall_guid = guid; rm.recall_status = 0; - rm.StartTime = epoch_to_datetime_str(info.starttime); + rm.StartTime = epoch_to_datetime_str(info.starttime); //每个记录是一个小时的时间段,张文暂态事件接口是一小时一条 rm.EndTime = epoch_to_datetime_str(info.endtime); rm.STEADY = std::to_string(stat); rm.VOLTAGE = std::to_string(voltage); @@ -1525,10 +1539,62 @@ int recall_json_handle_from_mq(const std::string& body) } } if (stat == 1) { - lm->recall_list_static.push_back(rm_all); + long long beg = parse_time_to_epoch(start); //转为整数时间 + long long ed = parse_time_to_epoch(end); + + if (beg < 0 || ed < 0 || beg > ed) { + std::cout << "[recall_file] invalid steady time range guid=" + << guid + << " terminal=" << terminalId + << " monitor=" << monId + << " range=" << start << "~" << end + << std::endl; + continue; + } + + rm_all.recall_ranges.push_back(std::make_pair(beg, ed));//每个时间段都记录到补招稳态时间列表里 + + if (!has_steady_range) { + rm_all.StartTime = start; //记录多个时间段的第一条开始和结束 + rm_all.EndTime = end; + has_steady_range = true; + } else { + long long old_beg = parse_time_to_epoch(rm_all.StartTime); + long long old_end = parse_time_to_epoch(rm_all.EndTime); + + if (old_beg < 0 || beg < old_beg) { + rm_all.StartTime = start; //对比多个时间段更新最早和最晚 + } + if (old_end < 0 || ed > old_end) { + rm_all.EndTime = end; + } + } + + std::cout << "[recall_file] add steady range guid=" + << guid + << " terminal=" << terminalId + << " monitor=" << monId + << " range=" << start << "~" << end + << " total_ranges=" << rm_all.recall_ranges.size() + << std::endl; } + if (stat == 0 && voltage == 0) return 10003; } + + // 所有 timeInterval 遍历完后,稳态 RecallFile 只入队一次 + if (stat == 1 && has_steady_range) { //一次补招指令的处理结束 + lm->recall_list_static.push_back(rm_all);//记录稳态补招 + + std::cout << "[recall_file] push steady recall once guid=" + << guid + << " terminal=" << terminalId + << " monitor=" << monId + << " ranges=" << rm_all.recall_ranges.size() + << " StartTime=" << rm_all.StartTime + << " EndTime=" << rm_all.EndTime + << std::endl; + } } } else { // 未知 dataType,忽略 @@ -4592,71 +4658,133 @@ void check_recall_event() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑统计数据 -// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ====== -static bool extract_epoch_from_filename(const std::string& name, - long long& out_epoch, - int logical_device_seq) +static std::string replace_all_copy(std::string s, + const std::string& from, + const std::string& to) { - // 拆分 - std::vector parts; - parts.reserve(8); - size_t start = 0, pos; - while ((pos = name.find('_', start)) != std::string::npos) { - parts.emplace_back(name.substr(start, pos - start)); - start = pos + 1; + size_t pos = 0; + while ((pos = s.find(from, pos)) != std::string::npos) { + s.replace(pos, from.length(), to); + pos += to.length(); } - parts.emplace_back(name.substr(start)); // 最后一段(含扩展名) - - if (parts.size() < 4) return false; - - // 第二段序号是倒数第4段 - const std::string& seq_str = parts[parts.size() - 4]; - // 允许前导 0:把字符串转 int 后比较 - for (char c : seq_str) if (!std::isdigit(static_cast(c))) return false; - int seq_val = 0; - try { - seq_val = std::stoi(seq_str); - } catch (...) { - return false; - } - if (seq_val != logical_device_seq) return false; - - // 其余与上面相同 - const std::string& date_str = parts[parts.size() - 3]; - const std::string& time_str = parts[parts.size() - 2]; - std::string ms_str = parts.back(); - size_t dot = ms_str.find('.'); - if (dot != std::string::npos) { - ms_str.erase(dot); - } - if (date_str.size() != 8 || time_str.size() != 6) return false; - for (char c : date_str) if (!std::isdigit(static_cast(c))) return false; - for (char c : time_str) if (!std::isdigit(static_cast(c))) return false; - for (char c : ms_str) if (!std::isdigit(static_cast(c))) return false; - - int year = std::stoi(date_str.substr(0, 4)); - int month = std::stoi(date_str.substr(4, 2)); - int day = std::stoi(date_str.substr(6, 2)); - int hour = std::stoi(time_str.substr(0, 2)); - int min = std::stoi(time_str.substr(2, 2)); - int sec = std::stoi(time_str.substr(4, 2)); - // int msec = std::stoi(ms_str); - - std::tm tm{}; tm.tm_isdst = -1; - tm.tm_year = year - 1900; - tm.tm_mon = month - 1; - tm.tm_mday = day; - tm.tm_hour = hour; - tm.tm_min = min; - tm.tm_sec = sec; - - time_t t = timegm(&tm); - if (t < 0) return false; - - out_epoch = static_cast(t); // 秒级 - return true; + return s; } +static std::string make_day_from_datetime(const std::string& t) +{ + // yyyy-MM-dd HH:mm:ss -> yyyyMMdd + if (t.size() < 10) return ""; + std::string day = t.substr(0, 10); + day.erase(std::remove(day.begin(), day.end(), '-'), day.end()); + return day; +} + +static std::string resolve_recall_dir(const std::string& raw_dir, + const RecallFile& rf, + const ledger_monitor& lm) +{ + std::string dir = raw_dir; + + std::string day = make_day_from_datetime(rf.StartTime); + + std::string seq = "1"; + try { + seq = std::to_string(std::stoi(lm.logical_device_seq)); + } catch (...) { + seq = "1"; + } + + // DESC 建议优先用你台账里的监测点描述/名称 + // 如果 lm.monitor_name 是“监测点1”,广东目录就是 /historyFile/监测点1 + std::string desc = lm.monitor_name; + if (desc.empty()) { + desc = std::string("Line") + seq; + } + + dir = replace_all_copy(dir, "%DAY%", day); + dir = replace_all_copy(dir, "%SEQ%", seq); + dir = replace_all_copy(dir, "%DESC%", desc); + + return dir; +} +/////////////////////////////目录解析 +// ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ====== +static bool extract_pqdif_range_from_filename(const std::string& fname, + long long& fs, + long long& fe) +{ + fs = -1; + fe = -1; + + auto to_epoch = [](int y, int mon, int d, int h, int m, int s)->long long { + struct tm t {}; + t.tm_year = y - 1900; + t.tm_mon = mon - 1; + t.tm_mday = d; + t.tm_hour = h; + t.tm_min = m; + t.tm_sec = s; + t.tm_isdst = -1; + return static_cast(mktime(&t)); + }; + + // 广东/上海:20230915000000_20230915010000.pqd 开始时间_结束时间 + { + std::regex re(R"((\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})_(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\.pqd$)", + std::regex::icase); + std::smatch m; + if (std::regex_search(fname, m, re)) { + fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]), + std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6])); + fe = to_epoch(std::stoi(m[7]), std::stoi(m[8]), std::stoi(m[9]), + std::stoi(m[10]), std::stoi(m[11]), std::stoi(m[12])); + return fs >= 0 && fe >= fs; + } + } + + // 云南:20230915T010000.pqd,文件时间按 1 小时窗口估算 时间点 + { + std::regex re(R"((\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})\.pqd$)", + std::regex::icase); + std::smatch m; + if (std::regex_search(fname, m, re)) { + fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]), + std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6])); + fe = fs + 3600; + return fs >= 0; + } + } + + // 新疆:B8B7FEE6D4792D30-20230915-003000-p.pqd,按 1 小时窗口估算 设备ID-时间点-p.pqd + { + std::regex re(R"([^-]+-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)", + std::regex::icase); + std::smatch m; + if (std::regex_search(fname, m, re)) { + fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]), + std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6])); + fe = fs + 3600; + return fs >= 0; + } + } + + // 默认:ied_ld_20230915_0030_60.pqd,最后的 60 是间隔分钟 设备名_逻辑设备_开始日期_开始时间_间隔分钟 + { + std::regex re(R"(.*_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)", + std::regex::icase); + std::smatch m; + if (std::regex_search(fname, m, re)) { + fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]), + std::stoi(m[4]), std::stoi(m[5]), 0); + + int intv_min = std::stoi(m[6]); + fe = fs + intv_min * 60; + return fs >= 0 && intv_min > 0; + } + } + + return false; +} /////////////////////////////////////////////////////////////////////////////////////////////////////////////// // 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) { @@ -4744,7 +4872,6 @@ static void dircache_clear_device(const std::string& dev_id) } //////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑 - // ====== ★修改:check_recall_stat —— 加入“两步法”状态机 ====== void check_recall_file() { @@ -4783,6 +4910,12 @@ void check_recall_file() { }; // ★修改结束 + auto RecallTypeName = [](const RecallFile& rf)->std::string { + if (rf.is_steady_file()) return "稳态文件"; + if (rf.is_voltage_file()) return "波形文件"; + return "文件"; + }; + std::vector pending_uploads; // ★修改:锁外执行上传与清理 @@ -4809,7 +4942,7 @@ void check_recall_file() { // ★优先级结束 for (auto& dev : terminal_devlist) { - // 仅处理“正在补招/空闲”终端,与你原逻辑一致 + // 仅处理“正在补招/空闲”终端 if (dev.busytype != static_cast(DeviceState::READING_STATSFILE) && dev.busytype != static_cast(DeviceState::IDLE)) { continue; @@ -4834,7 +4967,6 @@ void check_recall_file() { << " size=" << lm.recall_list_static.size() << " first{guid=" << front.recall_guid << ", status=" << front.recall_status - << ", direct=" << (front.direct_mode ? 1 : 0) << ", phase=" << static_cast(front.phase) << ", cur_dir=" << front.cur_dir << ", cur_file=" << front.downloading_file @@ -4868,7 +5000,6 @@ void check_recall_file() { << " size=" << lm.recall_list_static.size() << " first{guid=" << nf.recall_guid << ", status=" << nf.recall_status - << ", direct=" << (nf.direct_mode ? 1 : 0) << ", phase=" << static_cast(nf.phase) << ", cur_dir=" << nf.cur_dir << ", cur_file=" << nf.downloading_file @@ -4900,13 +5031,13 @@ void check_recall_file() { //20251218添加记录 std::string msg_fail; - if (front.direct_mode) { + if (front.is_voltage_file()) { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件失败,目标时标:" + + " 补招" + RecallTypeName(front) + "失败,目标时标:" + front.target_filetimes; } else { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件失败,时间范围:" + + " 补招" + RecallTypeName(front) + "失败,时间范围:" + front.StartTime + " ~ " + front.EndTime; } append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); @@ -4924,7 +5055,6 @@ void check_recall_file() { << " size=" << lm.recall_list_static.size() << " first{guid=" << nf.recall_guid << ", status=" << nf.recall_status - << ", direct=" << (nf.direct_mode ? 1 : 0) << ", phase=" << static_cast(nf.phase) << ", cur_dir=" << nf.cur_dir << ", cur_file=" << nf.downloading_file @@ -5034,8 +5164,12 @@ void check_recall_file() { front.file_success.clear(); // 立即发起第一个目录请求 - if (front.cur_dir_index < static_cast(front.dir_candidates.size())) { - front.cur_dir = front.dir_candidates[front.cur_dir_index];//从第一个目录开始 + if (front.cur_dir_index < static_cast(front.active_dirs().size())) { + front.cur_dir = resolve_recall_dir( + front.active_dirs()[front.cur_dir_index], + front, + lm + );//从第一个目录开始 // ★新增:先查设备级目录缓存 std::vector cached; @@ -5061,16 +5195,16 @@ void check_recall_file() { front.recall_status = static_cast(RecallStatus::FAILED); std::string msg_fail; - if (front.direct_mode) { - msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,目标时标:" - + front.target_filetimes; - } else { - msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,时间范围:" - + front.StartTime + " ~ " + front.EndTime; - } - append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); + if (front.is_voltage_file()) { + msg_fail = std::string("监测点:") + lm.monitor_name + + " 补招" + RecallTypeName(front) + "未找到,目标时标:" + + front.target_filetimes; + } else { + msg_fail = std::string("监测点:") + lm.monitor_name + + " 补招" + RecallTypeName(front) + "未找到,时间范围:" + + front.StartTime + " ~ " + front.EndTime; + } + append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); std::cout << "[check_recall_stat] no dir candidates, FAIL dev=" << dev.terminal_id << " monitor=" << lm.monitor_id << std::endl; @@ -5096,8 +5230,12 @@ void check_recall_file() { front.required_files.clear(); front.file_success.clear(); - if (front.cur_dir_index < static_cast(front.dir_candidates.size())) { - front.cur_dir = front.dir_candidates[front.cur_dir_index]; + if (front.cur_dir_index < static_cast(front.active_dirs().size())) { + front.cur_dir = resolve_recall_dir( + front.active_dirs()[front.cur_dir_index], + front, + lm + ); // ★新增:先查缓存 std::vector cached; @@ -5121,14 +5259,14 @@ void check_recall_file() { front.recall_status = static_cast(RecallStatus::FAILED); std::string msg_fail; - if (front.direct_mode) { + if (front.is_voltage_file()) { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,目标时标:" - + front.target_filetimes; + + " 补招" + RecallTypeName(front) + "未找到,目标时标:" + + front.target_filetimes;; } else { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,时间范围:" - + front.StartTime + " ~ " + front.EndTime; + + " 补招" + RecallTypeName(front) + "未找到,时间范围:" + + front.StartTime + " ~ " + front.EndTime; } append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); @@ -5140,89 +5278,104 @@ void check_recall_file() { // OK:根据起止时间筛选文件 { - /*//这部分用于稳态数据文件 - long long beg = parse_time_to_epoch(front.StartTime); - long long end = parse_time_to_epoch(front.EndTime); + long long beg = -1; + long long end = -1; - //错误判断:如果是直下文件的方式,会给默认的正确的时间范围 - if (beg < 0 || end < 0 || beg > end) { - front.recall_status = static_cast(RecallStatus::FAILED); - std::cout << "[check_recall_stat] time parse ERR, FAIL dev=" << dev.terminal_id - << " monitor=" << lm.monitor_id - << " start=" << front.StartTime - << " end=" << front.EndTime << std::endl; - break;//跳出循环,一个装置一次只能处理一个测点的一个补招记录;如果失败,下个循环会弹出 - }*/ + if (front.is_steady_file()) { + beg = parse_time_to_epoch(front.StartTime); + end = parse_time_to_epoch(front.EndTime); + + if (beg < 0 || end < 0 || beg > end) { + front.recall_status = static_cast(RecallStatus::FAILED); + std::cout << "[check_recall_file] steady time parse ERR, FAIL dev=" + << dev.terminal_id + << " monitor=" << lm.monitor_id + << " start=" << front.StartTime + << " end=" << front.EndTime << std::endl; - //装置消息返回后通知成功的处理: - auto it = front.dir_files.find(front.cur_dir);//在map中查找当前目录名对应的目录下的文件名列表 + front.phase = RecallPhase::IDLE; + front.downloading_file.clear(); + front.download_result = ActionResult::PENDING; + + break; + } + } + + auto it = front.dir_files.find(front.cur_dir); if (it != front.dir_files.end()) { - if (front.direct_mode) { - // 目标时间戳(不含毫秒、形如 yyyyMMdd_HHmmss) + if (front.is_voltage_file()) { //暂态波形下载 const std::string& want_ts = front.target_filetimes; - + for (const auto& ent : it->second) { - // 打印目录下的所有条目 - std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id + std::cout << "[check_recall_file] voltage dir file dev=" << dev.terminal_id << " monitor=" << lm.monitor_id << " dir=" << front.cur_dir << " file=" << ent.name << " flag=" << ent.flag << std::endl; - if (ent.flag == 0) continue; // 只要文件,跳过目录 + if (ent.flag == 0) continue; size_t n = ::strnlen(ent.name, sizeof(ent.name)); std::string fname(ent.name, n); - // 解析出 key = 监测点号_YYYYMMDD_HHMMSS(注意:确保你的 make_target_key_from_filename - // 已经改成“去掉毫秒”的版本;若还带毫秒,请先调整该函数) std::string key; if (!make_target_key_from_filename(fname, key)) { - std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id - << " monitor=" << lm.monitor_id - << " dir=" << front.cur_dir - << " file=" << fname - << " key=" << key - << " ERR: invalid filename format, skip" - << std::endl; continue; } - // key 形如 MON_YYYYMMDD_HHMMSS,目标是只按时间戳匹配: - if (has_suffix(key, want_ts)) { - //打印放入的文件名 - std::cout << "[check_recall_file] dir file dev=" << dev.terminal_id - << " monitor=" << lm.monitor_id - << " dir=" << front.cur_dir - << " file=" << fname - << " key=" << key - << " MATCH, add to download queue" - << std::endl; - + if (has_suffix(key, want_ts)) { front.download_queue.push_back(front.cur_dir + "/" + fname); } } - } else { //稳态文件 - // ☆原有:按时间窗筛选 - long long beg = parse_time_to_epoch(front.StartTime); - long long end = parse_time_to_epoch(front.EndTime); - + } + else if (front.is_steady_file()) { //稳态文件下载 for (const auto& ent : it->second) { - if (ent.flag != 1) continue; // 只要文件 - // 文件名 + if (ent.flag != 1) continue; + size_t n = ::strnlen(ent.name, sizeof(ent.name)); std::string fname(ent.name, n); + long long fs = -1; long long fe = -1; - int seq = 0; - try { seq = std::stoi(lm.logical_device_seq); } catch (...) { seq = 0; } - if (!extract_epoch_from_filename(fname, fe, seq)) continue; - if (fe >= beg && fe <= end) { - front.download_queue.push_back(front.cur_dir + "/" + fname); + + if (!extract_pqdif_range_from_filename(fname, fs, fe)) { + std::cout << "[check_recall_file] steady skip invalid pqdif filename dev=" + << dev.terminal_id + << " monitor=" << lm.monitor_id + << " dir=" << front.cur_dir + << " file=" << fname << std::endl; + continue; + } + + // 文件时间段和补招时间段有交集就下载 + bool hit = false; + + for (const auto& r : front.recall_ranges) {//每个文件的时间段和补招的多个时间段比对,只要有一个交集就算命中 + long long rb = r.first; + long long re = r.second; + + if (fs <= re && fe >= rb) { + hit = true; + break; + } + } + + if (hit) { + std::string remote_file = front.cur_dir + "/" + fname; + + if (front.required_files.insert(remote_file).second) { + front.download_queue.push_back(remote_file); + } } } } + else { + front.recall_status = static_cast(RecallStatus::FAILED); + append_recall_record_line(dev.guid, lm.monitor_id, + std::string("监测点:") + lm.monitor_name + " 文件补招类型为空,执行失败"); + break; + } } } @@ -5230,8 +5383,12 @@ void check_recall_file() { // 当前目录无匹配文件 -> 试下一个目录 front.cur_dir_index++; front.list_result = ActionResult::PENDING; - if (front.cur_dir_index < static_cast(front.dir_candidates.size())) { - front.cur_dir = front.dir_candidates[front.cur_dir_index]; + if (front.cur_dir_index < static_cast(front.active_dirs().size())) { + front.cur_dir = resolve_recall_dir( + front.active_dirs()[front.cur_dir_index], + front, + lm + ); // ★新增:先查缓存 std::vector cached; @@ -5255,13 +5412,13 @@ void check_recall_file() { front.recall_status = static_cast(RecallStatus::FAILED); std::string msg_fail; - if (front.direct_mode) { + if (front.is_voltage_file()) { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,目标时标:" + + " 补招" + RecallTypeName(front) + "未找到,目标时标:" + front.target_filetimes; } else { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,时间范围:" + + " 补招" + RecallTypeName(front) + "未找到,时间范围:" + front.StartTime + " ~ " + front.EndTime; } append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); @@ -5275,31 +5432,26 @@ void check_recall_file() { } else { // 进入下载阶段 - + // required_files 记录远端文件全集,用于和 file_success 对比 front.required_files.clear(); - //for (const auto& p : front.download_queue) front.required_files.insert(p); - //转成本地保存路径 download// - front.required_files.clear(); - { - const std::string base_dir = std::string("download/") + sanitize(dev.addr_str); - for (const auto& p : front.download_queue) { - // p 形如 "/" —— 提取纯文件名 - std::string fname = sanitize(extract_filename1(p)); - if (!fname.empty()) { - front.required_files.insert(base_dir + "/" + fname); - } + for (const auto& p : front.download_queue) { + if (!p.empty()) { + front.required_files.insert(p); // 保留远端路径 } } - + front.file_success.clear(); - + front.phase = RecallPhase::DOWNLOADING; front.download_result = ActionResult::PENDING; front.downloading_file.clear(); + std::cout << "[check_recall_stat] enter DOWNLOADING dev=" << dev.terminal_id << " monitor=" << lm.monitor_id - << " count=" << front.download_queue.size() << std::endl; + << " count=" << front.download_queue.size() + << " required_remote_count=" << front.required_files.size() + << std::endl; } } @@ -5320,6 +5472,23 @@ void check_recall_file() { front.recall_status = static_cast(RecallStatus::DONE);//两个文件都下好了标记为成功 + //稳态文件就不会走下面的逻辑 + if (front.is_steady_file()) { + std::string msg_ok = std::string("监测点:") + lm.monitor_name + + " 补招" + RecallTypeName(front) + "完成,文件数:" + + std::to_string(front.file_success.size()); + + append_recall_record_line(dev.guid, lm.monitor_id, msg_ok); + + std::cout << "[check_recall_file] STEADY DONE dev=" << dev.terminal_id + << " monitor=" << lm.monitor_id + << " ok=" << front.file_success.size() + << " total=" << front.required_files.size() + << std::endl; + + break; + } + //更新事件 // ★修改开始:替换“assign_qvvr_file_list + update_qvvr_file_download(有锁)” // 组装完整路径列表 @@ -5419,13 +5588,13 @@ void check_recall_file() { //20251218添加记录 std::string msg_fail; - if (front.direct_mode) { + if (front.is_voltage_file()) { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件下载失败,目标时标:" + + " 补招" + RecallTypeName(front) + "下载失败,目标时标:" + front.target_filetimes; } else { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件下载失败,时间范围:" + + " 补招" + RecallTypeName(front) + "下载失败,时间范围:" + front.StartTime + " ~ " + front.EndTime; } append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); @@ -5456,10 +5625,60 @@ void check_recall_file() { } if (front.download_result == ActionResult::OK) { + if (front.is_steady_file()) { + const std::string base_dir = std::string("download/") + sanitize(dev.addr_str); + + std::string seq = "1"; + try { + int nseq = std::stoi(lm.logical_device_seq); + if (nseq > 0) { + seq = std::to_string(nseq); + } else { + std::cout << "[check_recall_file][WARN] invalid logical_device_seq, use default M1_" + << " dev=" << dev.terminal_id + << " monitor=" << lm.monitor_id + << " logical_device_seq=" << lm.logical_device_seq + << std::endl; + DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_"); + } + } catch (...) { + std::cout << "[check_recall_file][WARN] parse logical_device_seq failed, use default M1_" + << " dev=" << dev.terminal_id + << " monitor=" << lm.monitor_id + << " logical_device_seq=" << lm.logical_device_seq + << std::endl; + DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "无法获取监测点序号,使用默认值 M1_"); + } + + std::string fname = sanitize(extract_filename1(front.downloading_file)); + if (!fname.empty()) { + std::string old_local = base_dir + "/" + fname; + std::string new_local = base_dir + "/M" + seq + "_" + fname; + + if (old_local != new_local) { + if (std::rename(old_local.c_str(), new_local.c_str()) != 0) { //修改下载好的文件名为 M1_开头的格式,方便后续处理;如果失败不影响结果,只是文件名不规范 + std::cout << "[check_recall_file][WARN] rename steady local file failed" + << " old=" << old_local + << " new=" << new_local + << " errno=" << errno + << std::endl; + DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "重命名本地文件失败"); + } else { + std::cout << "[check_recall_file] rename steady local file ok" + << " old=" << old_local + << " new=" << new_local + << std::endl; + DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "重命名本地文件成功"); + } + } + } + } + front.file_success.insert(front.downloading_file); std::string msg_ok = std::string("监测点:") + lm.monitor_name - + " 补招波形文件:" + front.downloading_file + + " 补招" + RecallTypeName(front) + ":" + + front.downloading_file + " 执行完成"; append_recall_record_line(dev.guid, lm.monitor_id, msg_ok); @@ -5477,7 +5696,8 @@ void check_recall_file() { } else { std::string msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件:" + front.downloading_file + + " 补招" + RecallTypeName(front) + ":" + + front.downloading_file + " 执行失败"; append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); @@ -5509,6 +5729,7 @@ void check_recall_file() { RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录 if (front.recall_status == static_cast(RecallStatus::NOT_STARTED)) { //补招未开始 + // 标记为 RUNNING,并设置终端忙状态 front.recall_status = static_cast(RecallStatus::RUNNING); //该补招记录刷新为补招中 dev.isbusy = 1; //装置由idle标记为忙 @@ -5519,9 +5740,13 @@ void check_recall_file() { // 初始化状态机并发出第一个目录请求 front.reset_runtime(true);//保留直下文件信息 front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表” - if (!front.dir_candidates.empty()) {//目录列表非空 + if (!front.active_dirs().empty()) {//目录列表非空 front.cur_dir_index = 0; //正在尝试的目录下标 - front.cur_dir = front.dir_candidates[0]; //第一个目录 + front.cur_dir = resolve_recall_dir( + front.active_dirs()[0], + front, + lm + ); //第一个目录 front.list_result = ActionResult::PENDING; //目录状态:等待返回 // ★新增:先查缓存 @@ -5550,18 +5775,18 @@ void check_recall_file() { front.recall_status = static_cast(RecallStatus::FAILED); //目录列表空,失败 std::string msg_fail; - if (front.direct_mode) { + if (front.is_voltage_file()) { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,目标时标:" + + " 补招" + RecallTypeName(front) + "未找到,目标时标:" + front.target_filetimes; } else { msg_fail = std::string("监测点:") + lm.monitor_name - + " 补招波形文件未找到,时间范围:" + + " 补招" + RecallTypeName(front) + "未找到,时间范围:" + front.StartTime + " ~ " + front.EndTime; } append_recall_record_line(dev.guid, lm.monitor_id, msg_fail); - std::cout << "[check_recall_stat] empty dir_candidates, FAIL dev=" << dev.terminal_id + std::cout << "[check_recall_stat] empty active_dirs, FAIL dev=" << dev.terminal_id << " monitor=" << lm.monitor_id << std::endl; } @@ -5700,10 +5925,10 @@ bool enqueue_direct_download(const std::string& dev_id, rf.recall_status = static_cast(RecallStatus::NOT_STARTED); rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗 rf.EndTime = "1970-01-01 00:00:01"; - //rf.dir_candidates = dir_candidates; // 要检索的目录列表和默认的一致 - rf.direct_mode = true; // ★关键:直下文件 rf.target_filetimes=filetime; // ▲单个文件时间入“列表” + rf.file_type = RecallFileType::VOLTAGE_FILE; // 直下波形文件 + lm_it->recall_list_static.push_back(std::move(rf)); // 若设备空闲,可直接置忙(可选,视你的流程而定) diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 5414b68..492797d 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -68,6 +68,12 @@ enum class ActionResult { }; // ====== ★修改:扩展 RecallFile,支持“多目录 + 文件筛选 + 串行下载”的状态机 ====== +enum class RecallFileType { + NONE = 0, + STEADY_FILE, // 稳态文件 + VOLTAGE_FILE // 暂态直下文件 +}; + class RecallFile { public: @@ -75,18 +81,31 @@ public: int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败 std::string StartTime; // 数据补招起始时间(yyyy-MM-dd HH:mm:ss) std::string EndTime; // 数据补招结束时间(yyyy-MM-dd HH:mm:ss) + + // ===== 业务类型 ===== + // STEADY:稳态文件补招 + // VOLTAGE:暂态事件补招 std::string STEADY; // 补招历史统计数据标识 0-不补招;1-补招 std::string VOLTAGE; // 补招暂态事件标识 0-不补招;1-补招 + // ===== 文件下载类型 ===== + RecallFileType file_type = RecallFileType::NONE; + //暂态文件用 - bool direct_mode = false; // 直下文件开关:true 表示不按时间窗,仅按目标文件名 - std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss),仅 direct_mode=true 时有效 + std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss) // ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填 std::map> dir_files; - // ★新增:候选目录(可扩展) - std::vector dir_candidates{ + std::vector steady_dir_candidates{ + "/pqdif", // 默认版本 / 新疆 + "/pqdif/%DAY%", // 上海:日期子目录 + "/pqdif/%DESC%/%DAY%", // 上海:pqdif_dir_cfg 或描述目录 + 日期 + "/pqdif/Line%SEQ%", // 云南 + "/historyFile/%DESC%" // 广东 + }; + + std::vector voltage_dir_candidates{ "/cf/COMTRADE", "/bd0/COMTRADE", "/sd0/COMTRADE", @@ -102,6 +121,9 @@ public: ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果 ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果 + // 稳态文件用:一个 guid 下的多个补招时间段 + std::vector> recall_ranges; + // ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径) std::list download_queue; //一个时间可能对应多个文件 std::string downloading_file; // 当前正在下载的文件(完整路径) @@ -109,8 +131,22 @@ public: std::unordered_set required_files; // 本次应当下载成功的文件全集 std::unordered_set file_success; // 已下载成功的文件集合 + // 是否稳态文件任务 + bool is_steady_file() const { + return file_type == RecallFileType::STEADY_FILE; + } + + // 是否暂态直下文件任务 + bool is_voltage_file() const { + return file_type == RecallFileType::VOLTAGE_FILE; + } + + const std::vector& active_dirs() const { + return is_steady_file() ? steady_dir_candidates : voltage_dir_candidates; + } + // ★新增:一个便捷复位 - void reset_runtime(bool keep_direct = false) + void reset_runtime(bool keep_target_filetimes = false) { phase = RecallPhase::IDLE; cur_dir_index = 0; @@ -124,9 +160,10 @@ public: required_files.clear(); file_success.clear(); + // 注意:file_type 不属于运行态,不能清除,因为它决定了本次补招的业务类型(稳态/暂态),而这个业务类型在整个补招过程中是固定的,不应当被运行态重置影响 + // ★新增:按需保留直下文件开关和目标名 - if (!keep_direct) { - direct_mode = false; + if (!keep_target_filetimes) { target_filetimes.clear(); // ▲列表清空 } } diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index 4976961..316370c 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -98,6 +98,8 @@ extern int TEST_PORT; //测试端口号 extern std::string FRONT_INST; +extern bool PQD_FLAG; + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数 template @@ -130,6 +132,11 @@ bool parse_param(int argc, char* argv[]) { try { g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_num = std::stoi(val.substr(pos + 1)); + + if (g_front_seg_index == 0) { + PQD_FLAG = true; + } + } catch (...) { std::cerr << "Invalid -s format." << std::endl; } @@ -144,6 +151,11 @@ bool parse_param(int argc, char* argv[]) { try { g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_num = std::stoi(val.substr(pos + 1)); + + if (g_front_seg_index == 0) { + PQD_FLAG = true; + } + } catch (...) { std::cerr << "Invalid -s format." << std::endl; } @@ -223,13 +235,13 @@ std::string get_parent_directory() { //解析模板文件 //Set_xml_nodeinfo(); - StartFrontThread(); //开启主线程 - - StartMQConsumerThread(); //开启消费者线程 - StartMQProducerThread(); //开启生产者线程 - StartTimerThread(); //开启定时线程 + if(!PQD_FLAG){ + StartFrontThread(); //开启主线程 + StartMQConsumerThread(); //开启消费者线程 + StartTimerThread(); //开启定时线程 + } //启动worker 根据启动标志启动 if(G_TEST_FLAG){ diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index d87a492..0ebed52 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -758,12 +758,20 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { << ", VOLTAGE=" << rf.VOLTAGE << "\n"; - // ★新增:直下模式与目标时间列表 - os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false") - << ", target_filetimes(" << rf.target_filetimes << ")\n"; - { - os << "\r\x1B[K |.. " << rf.target_filetimes << "\n"; + // ★新增:文件补招类型与目标信息 + os << "\r\x1B[K |-- file_type=" + << (rf.is_steady_file() ? "STEADY_FILE" : + rf.is_voltage_file() ? "VOLTAGE_FILE" : "NONE"); + + if (rf.is_voltage_file()) { + os << ", target_filetimes=" << rf.target_filetimes; } + else if (rf.is_steady_file()) { + os << ", time_range=" << rf.StartTime + << " ~ " << rf.EndTime; + } + + os << "\n"; // ★新增:状态机运行态 os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase) @@ -773,15 +781,15 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { << ", download_result=" << resultStr(rf.download_result) << "\n"; // ★新增:候选目录 - os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n"; + os << "\r\x1B[K |-- active_dirs(" << rf.active_dirs().size() << ")\n"; { size_t c = 0; - for (const auto& d : rf.dir_candidates) { + for (const auto& d : rf.active_dirs()) { if (c++ >= MAX_ITEMS) break; os << "\r\x1B[K |-- " << d << "\n"; } - if (rf.dir_candidates.size() > MAX_ITEMS) { - os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n"; + if (rf.active_dirs().size() > MAX_ITEMS) { + os << "\r\x1B[K |.. (+" << (rf.active_dirs().size() - MAX_ITEMS) << " more)\n"; } } diff --git a/LFtid1056/pqdif_thread_processor.cpp b/LFtid1056/pqdif_thread_processor.cpp index 254bc1b..3f21528 100644 --- a/LFtid1056/pqdif_thread_processor.cpp +++ b/LFtid1056/pqdif_thread_processor.cpp @@ -28,6 +28,17 @@ #include "pqdif/include/pqdif_lg.h" #include "pqdif_semantic_ids.h" +#include "cloudfront/code/log4.h" //lnk20260526 + +extern void enqueue_stat_pq(const std::string& max_base64Str, + const std::string& min_base64Str, + const std::string& avg_base64Str, + const std::string& cp95_base64Str, + time_t data_time, + const std::string& mac, + short cid); +extern std::string extract_filename1(const std::string& path); + namespace fs = std::experimental::filesystem; namespace { @@ -7877,6 +7888,96 @@ void ClearReadyPqdifStatBase64Queue() g_pqdif_stat_base64_ready_queue.clear(); } +static bool GetBase64ByKind(const PqdifStatBase64TimePointPacket& tp, //从序列中获取指定 kind 的 Base64 内容 + StatValueKind kind, + std::string& out) +{ + for (const auto& r : tp.records) { + if (r.value_kind == kind) { + out = r.base64_payload; + return !out.empty(); + } + } + return false; +} + +static bool extract_monitor_seq_from_local_pqdif_path(const std::string& path, + short& point_name) +{ + point_name = 0; + + std::cout << "[extract_monitor_seq] begin path=" + << path << std::endl; + + // 取纯文件名,例如: + // download/192.168.1.10/M1_xxx.pqd + // -> M1_xxx.pqd + std::string fname = extract_filename1(path); + + std::cout << "[extract_monitor_seq] filename=" + << fname << std::endl; + + if (fname.size() < 3) { + std::cout << "[extract_monitor_seq] filename too short" + << std::endl; + return false; + } + + if (fname[0] != 'M' && fname[0] != 'm') { + std::cout << "[extract_monitor_seq] filename not start with M/m" + << std::endl; + return false; + } + + size_t pos = fname.find('_'); + + std::cout << "[extract_monitor_seq] underscore pos=" + << pos << std::endl; + + if (pos == std::string::npos || pos <= 1) { + std::cout << "[extract_monitor_seq] invalid underscore position" + << std::endl; + return false; + } + + // M1_xxx -> 1 + std::string seq_str = fname.substr(1, pos - 1); + + std::cout << "[extract_monitor_seq] seq_str=" + << seq_str << std::endl; + + for (char c : seq_str) { + if (!std::isdigit(static_cast(c))) { + std::cout << "[extract_monitor_seq] non-digit char=" + << c << std::endl; + return false; + } + } + + try { + point_name = static_cast(std::stoi(seq_str)); + + std::cout << "[extract_monitor_seq] success point_name=" + << point_name << std::endl; + + return point_name > 0; + } + catch (const std::exception& e) { + std::cout << "[extract_monitor_seq] exception=" + << e.what() << std::endl; + + point_name = 0; + return false; + } + catch (...) { + std::cout << "[extract_monitor_seq] unknown exception" + << std::endl; + + point_name = 0; + return false; + } +} + void RunPqdifScanLoop() { std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir @@ -7910,6 +8011,54 @@ void RunPqdifScanLoop() if (PopReadyPqdifStatBase64FileBatch(batch)) { // batch 就是一个 PQDIF 文件完整的 Base64 组装结果 // 在此处处理上送逻辑 + const std::string& mac = batch.mac; + + short point_name = 0; + + if (!extract_monitor_seq_from_local_pqdif_path(batch.pqdif_file_path, point_name)) { + std::cout << "[PQDIF_UPLOAD] failed to extract monitor seq from file=" + << batch.pqdif_file_path << std::endl; + continue; + } + + for (const auto& tp : batch.time_points) { + std::string max_base64; + std::string min_base64; + std::string avg_base64; + std::string p95_base64; + + bool has_max = GetBase64ByKind(tp, StatValueKind::Max, max_base64); + bool has_min = GetBase64ByKind(tp, StatValueKind::Min, min_base64); + bool has_avg = GetBase64ByKind(tp, StatValueKind::Avg, avg_base64); + bool has_p95 = GetBase64ByKind(tp, StatValueKind::P95, p95_base64); + + if (!has_max || !has_min || !has_avg || !has_p95) { + std::cout << "[PQDIF_UPLOAD] skip incomplete time point, file=" + << batch.pqdif_file_path + << " time=" << tp.timestamp_text + << " has_max=" << has_max + << " has_min=" << has_min + << " has_avg=" << has_avg + << " has_p95=" << has_p95 + << std::endl; + continue; + } + + enqueue_stat_pq(max_base64, + min_base64, + avg_base64, + p95_base64, + tp.timestamp, + mac, + point_name); + + std::cout << "[PQDIF_UPLOAD] enqueue_stat_pq ok, file=" + << batch.pqdif_file_path + << " time=" << tp.timestamp_text + << " mac=" << mac + << " point=" << point_name + << std::endl; + } } } catch (const std::exception& ex)