diff --git a/LFtid1056.rar b/LFtid1056.rar index 3fe13a6..86ce786 100644 Binary files a/LFtid1056.rar and b/LFtid1056.rar differ diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index ea090f3..79c07dd 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -4711,10 +4711,14 @@ static std::string resolve_recall_dir(const std::string& raw_dir, // ====== 从文件名中提取“第二段下划线分隔字段”并转换为 epoch 秒 ====== static bool extract_pqdif_range_from_filename(const std::string& fname, long long& fs, - long long& fe) + long long& fe, + int& file_seq, + std::string& file_monitor_id) { fs = -1; fe = -1; + file_seq = -1; + file_monitor_id.clear(); auto to_epoch = [](int y, int mon, int d, int h, int m, int s)->long long { struct tm t {}; @@ -4755,30 +4759,46 @@ static bool extract_pqdif_range_from_filename(const std::string& fname, } } - // 新疆:B8B7FEE6D4792D30-20230915-003000-p.pqd,按 1 小时窗口估算 设备ID-时间点-p.pqd + // 新疆:B8B7FEE6D4792D30-20230915-003000-p.pqd { - std::regex re(R"([^-]+-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)", - std::regex::icase); + std::regex re( + R"(([^-]+)-(\d{4})(\d{2})(\d{2})-(\d{2})(\d{2})(\d{2})-p\.pqd$)", + std::regex::icase); + std::smatch m; if (std::regex_search(fname, m, re)) { - fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]), - std::stoi(m[4]), std::stoi(m[5]), std::stoi(m[6])); + file_monitor_id = m[1]; // B8B7FEE6D4792D30 + + fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]), + std::stoi(m[5]), std::stoi(m[6]), std::stoi(m[7])); fe = fs + 3600; return fs >= 0; } } - // 默认:ied_ld_20230915_0030_60.pqd,最后的 60 是间隔分钟 设备名_逻辑设备_开始日期_开始时间_间隔分钟 + // 默认:ied_ld_20230915_0030_60.pqd + // 例如:PQMonitor_PQM1_20260601_0000_24.pqd + // 第二段 PQM1 最后的数字作为 seq { - std::regex re(R"(.*_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)", + std::regex re(R"([^_]+_([^_]+)_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})_(\d+)\.pqd$)", std::regex::icase); std::smatch m; if (std::regex_search(fname, m, re)) { - fs = to_epoch(std::stoi(m[1]), std::stoi(m[2]), std::stoi(m[3]), - std::stoi(m[4]), std::stoi(m[5]), 0); - int intv_min = std::stoi(m[6]); + std::string ld = m[1]; // 例如 PQM1、PQM2 + + std::regex seq_re(R"((\d+)$)"); + std::smatch seq_m; + if (std::regex_search(ld, seq_m, seq_re)) { + file_seq = std::stoi(seq_m[1]); + } + + fs = to_epoch(std::stoi(m[2]), std::stoi(m[3]), std::stoi(m[4]), + std::stoi(m[5]), std::stoi(m[6]), 0); + + int intv_min = std::stoi(m[7]); fe = fs + intv_min * 60; + return fs >= 0 && intv_min > 0; } } @@ -4871,6 +4891,34 @@ static void dircache_clear_device(const std::string& dev_id) g_device_dir_cache.erase(dev_id); } +//复制文件 +bool move_file_crossfs(const std::string& src, const std::string& dst) +{ + if (std::rename(src.c_str(), dst.c_str()) == 0) { + return true; + } + + if (errno != EXDEV) { + return false; + } + + std::ifstream in(src.c_str(), std::ios::binary); + std::ofstream out(dst.c_str(), std::ios::binary | std::ios::trunc); + if (!in || !out) { + return false; + } + + out << in.rdbuf(); + out.close(); + in.close(); + + if (std::remove(src.c_str()) != 0) { + return false; + } + + return true; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑 // ====== ★修改:check_recall_stat —— 加入“两步法”状态机 ====== void check_recall_file() { @@ -5338,8 +5386,10 @@ void check_recall_file() { long long fs = -1; long long fe = -1; - - if (!extract_pqdif_range_from_filename(fname, fs, fe)) { + int file_seq = -1; + std::string file_monitor_id; + + if (!extract_pqdif_range_from_filename(fname, fs, fe, file_seq, file_monitor_id)) { std::cout << "[check_recall_file] steady skip invalid pqdif filename dev=" << dev.terminal_id << " monitor=" << lm.monitor_id @@ -5347,6 +5397,43 @@ void check_recall_file() { << " file=" << fname << std::endl; continue; } + + int cur_seq = -1; + try { + cur_seq = std::stoi(lm.logical_device_seq); + } catch (...) { + cur_seq = -1; + } + + // 新疆:用文件名里的 monitor_id 找 seq + if (!file_monitor_id.empty()) { + if (file_monitor_id != lm.monitor_id) {//跳过id不匹配的新疆文件 + std::cout << "[check_recall_file] steady skip other xj monitor file dev=" + << dev.terminal_id + << " cur_monitor=" << lm.monitor_id + << " file_monitor_id=" << file_monitor_id + << " file=" << fname + << std::endl; + continue; + } + + // 到这里说明新疆文件的测点ID就是当前测点 + file_seq = cur_seq; + } + // 默认:文件名能解析出 PQM 序号 + else if (file_seq > 0) { + if (cur_seq > 0 && file_seq != cur_seq) {//跳过不同测点的文件 + std::cout << "[check_recall_file] steady skip other seq file dev=" + << dev.terminal_id + << " cur_monitor=" << lm.monitor_id + << " cur_seq=" << cur_seq + << " file_seq=" << file_seq + << " file=" << fname + << std::endl; + continue; + } + } + // 广东/上海/云南:file_monitor_id 为空,file_seq 也为 -1,不过滤 // 文件时间段和补招时间段有交集就下载 bool hit = false; @@ -5653,16 +5740,20 @@ void check_recall_file() { std::string fname = sanitize(extract_filename1(front.downloading_file)); if (!fname.empty()) { std::string old_local = base_dir + "/" + fname; - std::string new_local = base_dir + "/M" + seq + "_" + fname; + std::string new_fname = "M" + seq + "_" + fname; + std::string new_local = base_dir + "/" + new_fname; + // 先把下载目录下文件名改成 M1_xxx.pqd if (old_local != new_local) { - if (std::rename(old_local.c_str(), new_local.c_str()) != 0) { //修改下载好的文件名为 M1_开头的格式,方便后续处理;如果失败不影响结果,只是文件名不规范 + if (std::rename(old_local.c_str(), new_local.c_str()) != 0) { std::cout << "[check_recall_file][WARN] rename steady local file failed" << " old=" << old_local << " new=" << new_local << " errno=" << errno << std::endl; DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "重命名本地文件失败"); + new_local = old_local; // 重命名失败,后续仍尝试用原文件 + new_fname = fname; } else { std::cout << "[check_recall_file] rename steady local file ok" << " old=" << old_local @@ -5671,6 +5762,56 @@ void check_recall_file() { DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "重命名本地文件成功"); } } + + // 只处理 .pqd 文件 + if (new_fname.size() >= 4 && + new_fname.substr(new_fname.size() - 4) == ".pqd") { + + std::string pqdif_dir = std::string("download_pqdif/") + sanitize(dev.addr_str); + + // 如果你已有递归建目录函数,用你的函数即可 + create_directory_recursive(pqdif_dir.c_str()); + + std::string tmp_fname = new_fname.substr(0, new_fname.size() - 4) + ".tmp"; + + std::string local_tmp = base_dir + "/" + tmp_fname; + std::string dst_tmp = pqdif_dir + "/" + tmp_fname; + std::string dst_pqd = pqdif_dir + "/" + new_fname; + + // 1. .pqd -> .tmp + if (std::rename(new_local.c_str(), local_tmp.c_str()) != 0) { + std::cout << "[check_recall_file][WARN] rename .pqd to .tmp failed" + << " old=" << new_local + << " new=" << local_tmp + << " errno=" << errno + << std::endl; + DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), ".pqd改.tmp失败"); + } + // 2. 移动 .tmp 到 download_pqdif + else if (!move_file_crossfs(local_tmp, dst_tmp)) { + std::cout << "[check_recall_file][WARN] move tmp file failed" + << " old=" << local_tmp + << " new=" << dst_tmp + << " errno=" << errno + << std::endl; + DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "移动tmp文件失败"); + } + // 3. download_pqdif 下 .tmp -> .pqd + else if (std::rename(dst_tmp.c_str(), dst_pqd.c_str()) != 0) {//原子操作,不会影响解析线程扫描到不完整文件 + std::cout << "[check_recall_file][WARN] rename .tmp to .pqd failed" + << " old=" << dst_tmp + << " new=" << dst_pqd + << " errno=" << errno + << std::endl; + DIY_ERRORLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), ".tmp改.pqd失败"); + } else { + std::cout << "[check_recall_file] move pqd file to pqdif dir ok" + << " src=" << new_local + << " dst=" << dst_pqd + << std::endl; + DIY_DEBUGLOG_CODE(lm.monitor_id, 2, static_cast(LogCode::LOG_CODE_RECALL), "pqd文件移动到解析目录成功"); + } + } } } diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 492797d..8570eb2 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -98,11 +98,15 @@ public: std::map> dir_files; std::vector steady_dir_candidates{ - "/pqdif", // 默认版本 / 新疆 - "/pqdif/%DAY%", // 上海:日期子目录 - "/pqdif/%DESC%/%DAY%", // 上海:pqdif_dir_cfg 或描述目录 + 日期 - "/pqdif/Line%SEQ%", // 云南 - "/historyFile/%DESC%" // 广东 + "/cf/pqdif", //580绝对真实路径 + "/bd0/pqdif", //chemengyu提供包含bd0 + "/bd0/pqdif/%DESC%/%DAY%", + "/bd0/pqdif/Line%SEQ%", + "/bd0/historyFile/%DESC%", + "/pqdif", // 默认版本 / 新疆 可能取到其他测点文件 + "/pqdif/%DESC%/%DAY%", // 上海:pqdif_dir_cfg 或描述目录 + 日期 不会取到其他测点文件 + "/pqdif/Line%SEQ%", // 云南 不会取到其他测点文件 + "/historyFile/%DESC%" // 广东 不会取到其他测点文件 }; std::vector voltage_dir_candidates{ diff --git a/LFtid1056/pqdif_thread_processor.cpp b/LFtid1056/pqdif_thread_processor.cpp index 528b47d..aa75c64 100644 --- a/LFtid1056/pqdif_thread_processor.cpp +++ b/LFtid1056/pqdif_thread_processor.cpp @@ -53,7 +53,7 @@ namespace { // 而是按通道逐步聚合成时间桶并直接组装 Base64,避免单文件中间对象占用过大内存。 constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000; - const char* kPqdRootDir = "download"; + const char* kPqdRootDir = "download_pqdif"; const char* kDoneRootDir = "download_done"; const char* kFailRootDir = "download_fail";