#include "pqdif_thread_processor.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // PQDIF 解析库 #include "pqdif/PQDIF.h" #include "pqdif/include/pqdif_ph.h" #include "pqdif/include/pqdif_id.h" namespace fs = std::experimental::filesystem; namespace { // ============================ // 可配置参数 // ============================ constexpr int kScanIntervalSec = 60; // 扫描目录间隔 constexpr int kBackupLimit = 4800; // 成功/失败目录最大保留文件数 constexpr size_t kParsedCacheLimit = 128; // 内存缓存最大数量 const char* kPqdRootDir = "download"; // 输入目录:download//*.pqd const char* kDoneRootDir = "download_done"; const char* kFailRootDir = "download_fail"; // ============================ // 内存缓存 // ============================ std::deque g_parsed_cache; std::mutex g_parsed_cache_mutex; // ============================ // 基础工具函数 // ============================ // ============================ // 调试打印辅助函数 // 作用:把暂存到内存里的 PQDIF 内容打印出来 // ============================ // GUID 转字符串,便于查看标签值 std::string guid_to_string(const GUID& g) { char buf[64] = { 0 }; std::snprintf(buf, sizeof(buf), "%08x-%04x-%04x-%02x%02x-%02x%02x%02x%02x%02x%02x", static_cast(g.Data1), static_cast(g.Data2), static_cast(g.Data3), static_cast(g.Data4[0]), static_cast(g.Data4[1]), static_cast(g.Data4[2]), static_cast(g.Data4[3]), static_cast(g.Data4[4]), static_cast(g.Data4[5]), static_cast(g.Data4[6]), static_cast(g.Data4[7])); return std::string(buf); } // 打印数值数组前几个样本,避免日志过长 void print_value_preview(const std::vector& values, const char* title, size_t preview_count = 5) { if (values.empty()) return; std::cout << " " << title << " count=" << values.size() << " preview=["; const size_t n = std::min(values.size(), preview_count); for (size_t i = 0; i < n; ++i) { if (i > 0) std::cout << ", "; std::cout << values[i]; } if (values.size() > preview_count) std::cout << ", ..."; std::cout << "]" << std::endl; } // 打印时间数组前几个样本 void print_time_preview(const std::vector& values, const char* title, size_t preview_count = 5) { if (values.empty()) return; std::cout << " " << title << " count=" << values.size() << " preview=["; const size_t n = std::min(values.size(), preview_count); for (size_t i = 0; i < n; ++i) { if (i > 0) std::cout << ", "; std::cout << static_cast(values[i]); } if (values.size() > preview_count) std::cout << ", ..."; std::cout << "]" << std::endl; } // 打印某一类 series 的标签 void print_series_meta(const RawSeriesTagMeta& meta, const char* name) { std::cout << " [" << name << "]" << " units_id=" << meta.quantity_units_id << " characteristic_id=" << guid_to_string(meta.quantity_characteristic_id) << " value_type_id=" << guid_to_string(meta.value_type_id) << " base_type=" << meta.series_base_type << " scale=" << meta.series_scale << " offset=" << meta.series_offset << std::endl; } // 打印单个通道的内容 void dump_channel_detail(const std::string& key, const RawChannelSeries& ch) { std::cout << " [CHANNEL]" << " key=" << key << " raw_name=" << ch.channel_tag.raw_channel_name << " phase_id=" << ch.channel_tag.phase_id << " quantity_type_id=" << guid_to_string(ch.channel_tag.quantity_type_id) << " quantity_measured_id=" << ch.channel_tag.quantity_measured_id << " freq=" << ch.channel_tag.channel_frequency << " group_id=" << ch.channel_tag.group_id << std::endl; if (!ch.times.empty()) { print_series_meta(ch.time_meta, "TIME"); print_time_preview(ch.times, "TIME"); } if (!ch.max_values.empty()) { print_series_meta(ch.max_meta, "MAX"); print_value_preview(ch.max_values, "MAX"); } if (!ch.min_values.empty()) { print_series_meta(ch.min_meta, "MIN"); print_value_preview(ch.min_values, "MIN"); } if (!ch.avg_values.empty()) { print_series_meta(ch.avg_meta, "AVG"); print_value_preview(ch.avg_values, "AVG"); } if (!ch.cp95_values.empty()) { print_series_meta(ch.cp95_meta, "CP95"); print_value_preview(ch.cp95_values, "CP95"); } if (!ch.val_values.empty()) { print_series_meta(ch.val_meta, "VAL"); print_value_preview(ch.val_values, "VAL"); } } // 打印整个文件的暂存内容摘要 void dump_parsed_map_summary(const std::string& file_path, const RawChannelMap& raw_map) { std::cout << "========== PQDIF PARSED SUMMARY ==========" << std::endl; std::cout << "file=" << file_path << ", channel_count=" << raw_map.size() << std::endl; for (const auto& kv : raw_map) { dump_channel_detail(kv.first, kv.second); } std::cout << "==========================================" << std::endl; } bool dump_file_basic_info(const std::string& file_path, std::string& err) { std::error_code ec; fs::path p(file_path); if (!fs::exists(p, ec)) { err = "file not exists"; return false; } if (!fs::is_regular_file(p, ec)) { err = "not a regular file"; return false; } auto file_size = fs::file_size(p, ec); if (ec) { err = "cannot get file size"; return false; } std::ifstream ifs(file_path, std::ios::binary); if (!ifs.is_open()) { err = "ifstream open failed"; return false; } std::cout << "[PQDIF] file basic info: path=" << file_path << ", size=" << file_size << std::endl; // 打印前 32 字节十六进制,便于判断是否像标准文件头 unsigned char buf[32] = { 0 }; ifs.read(reinterpret_cast(buf), sizeof(buf)); std::streamsize n = ifs.gcount(); std::ostringstream oss; oss << "[PQDIF] first " << n << " bytes: "; for (std::streamsize i = 0; i < n; ++i) { oss << std::hex << std::setw(2) << std::setfill('0') << static_cast(buf[i]) << " "; } std::cout << oss.str() << std::endl; return true; } std::string to_upper_copy(std::string s) { std::transform(s.begin(), s.end(), s.begin(), [](unsigned char c) { return static_cast(std::toupper(c)); }); return s; } std::string trim_copy(const std::string& s) { size_t beg = 0; while (beg < s.size() && std::isspace(static_cast(s[beg]))) ++beg; size_t end = s.size(); while (end > beg && std::isspace(static_cast(s[end - 1]))) --end; return s.substr(beg, end - beg); } // 规范化通道名 // 注意:当前阶段只用于 map key,不作为识别依据 std::string normalize_key(const std::string& src) { std::string out; out.reserve(src.size()); for (char c : src) { unsigned char uc = static_cast(c); if (std::isalnum(uc) || c == '[' || c == ']') out.push_back(static_cast(std::toupper(uc))); } return out; } bool is_pqdif_file(const fs::path& path) { if (!fs::is_regular_file(path)) return false; const std::string ext = to_upper_copy(path.extension().string()); return ext == ".PQD" || ext == ".PQDIF"; } bool ensure_dir(const fs::path& dir) { std::error_code ec; if (fs::exists(dir, ec)) return true; return fs::create_directories(dir, ec) || fs::exists(dir, ec); } bool move_file_with_fallback(const fs::path& src, const fs::path& dst) { std::error_code ec; ensure_dir(dst.parent_path()); fs::rename(src, dst, ec); if (!ec) return true; ec.clear(); fs::copy_file(src, dst, fs::copy_options::overwrite_existing, ec); if (ec) return false; ec.clear(); fs::remove(src, ec); return !ec; } void cleanup_backup_dir(const fs::path& dir, int limit) { if (limit < 0) return; std::error_code ec; if (!fs::exists(dir, ec) || !fs::is_directory(dir, ec)) return; std::vector files; for (fs::directory_iterator it(dir, ec), end; !ec && it != end; it.increment(ec)) { if (!ec && is_pqdif_file(it->path())) files.push_back(it->path()); } if (files.size() <= static_cast(limit)) return; std::sort(files.begin(), files.end(), [](const fs::path& a, const fs::path& b) { std::error_code ea, eb; return fs::last_write_time(a, ea) < fs::last_write_time(b, eb); }); const size_t remove_count = files.size() - static_cast(limit); for (size_t i = 0; i < remove_count; ++i) { std::error_code remove_ec; fs::remove(files[i], remove_ec); } } // 生成稳定 key:只用于存储 // 后续真正指标识别不要依赖这个 std::string build_channel_key(const std::string& channel_name, double channel_freq, int group_id) { std::ostringstream oss; oss << trim_copy(channel_name); if (channel_freq > 0.0) { const int harmonic_no = static_cast(std::llround(channel_freq / 50.0)); if (harmonic_no > 0) oss << "[" << harmonic_no << "]"; } else if (group_id > 0) { oss << "[" << group_id << "]"; } return normalize_key(oss.str()); } // ============================ // 缓存入队 // ============================ bool push_parsed_result_to_cache(ParsedPqdifFile&& parsed) { std::lock_guard guard(g_parsed_cache_mutex); if (g_parsed_cache.size() >= kParsedCacheLimit) { std::cout << "[PQDIF] cache full, drop oldest: " << g_parsed_cache.front().source_file << std::endl; g_parsed_cache.pop_front(); } g_parsed_cache.emplace_back(std::move(parsed)); return true; } // ============================ // PQDIF 原始解析 // 当前阶段:只保存标签 + 原始值 // ============================ bool parse_pqdif_file_raw(const std::string& file_path, RawChannelMap& out_map, std::string& err) { CPQDIF file_convert; file_convert.put_FlatFileName(file_path); std::string basic_err; if (!dump_file_basic_info(file_path, basic_err)) { err = "precheck failed: " + basic_err; return false; } if (!file_convert.Read()) { err = "CPQDIF::Read() failed"; return false; } const int record_count = static_cast(file_convert.RecordGetCount()); for (int i_record = 0; i_record < record_count; ++i_record) { GUID record_guid{}; std::string record_name; if (!file_convert.RecordGetInfo(i_record, &record_guid, record_name)) continue; // 当前阶段只处理 Observation Record if (!PQDIF_IsEqualGUID(record_guid, tagRecObservation)) continue; long observation_handle = 0; if (!file_convert.RecordRequestObservation(i_record, &observation_handle)) continue; DATE time_start = 0; std::string observation_name; long channel_count = 0; if (!file_convert.ObservationGetInfo(observation_handle, time_start, observation_name, channel_count)) { file_convert.RecordReleaseObservation(observation_handle); continue; } // 当前线程只处理统计型 Observation,不处理录波型 Observation long trigger_method_id = 0; DATE trigger_time = 0; file_convert.ObservationGetTriggerInfo(observation_handle, &trigger_method_id, &trigger_time); if (trigger_method_id == ID_TRIGGER_METH_CHANNEL || trigger_method_id == -1) { file_convert.RecordReleaseObservation(observation_handle); continue; } time_t observation_start_ts = 0; file_convert.GetTime(time_start, &observation_start_ts); // 遍历通道 for (int i_channel = 0; i_channel < channel_count; ++i_channel) { PqdifChannelInfoEx ch_info; if (!file_convert.ObservationGetChannelInfoEx(observation_handle, i_channel, &ch_info)) continue; if (ch_info.name.empty()) continue; double channel_freq = 0.0; int group_id = 0; file_convert.ObservationGetChannelFreq(observation_handle, i_channel, &channel_freq); file_convert.ObservationGetChannelGroupID(observation_handle, i_channel, &group_id); const std::string key = build_channel_key(ch_info.name, channel_freq, group_id); RawChannelSeries& raw_series = out_map[key]; // 保存通道级标签 raw_series.channel_tag.raw_channel_name = ch_info.name; raw_series.channel_tag.normalized_channel_name = key; raw_series.channel_tag.phase_id = ch_info.phaseId; raw_series.channel_tag.quantity_type_id = ch_info.quantityTypeId; raw_series.channel_tag.quantity_measured_id = ch_info.quantityMeasuredId; raw_series.channel_tag.channel_frequency = channel_freq; raw_series.channel_tag.group_id = group_id; // 遍历该通道下的所有 series for (int i_series = 0; i_series < ch_info.countSeries; ++i_series) { PqdifSeriesInfoEx sr_info; if (!file_convert.ObservationGetSeriesInfoEx(observation_handle, i_channel, i_series, &sr_info)) continue; double* values = nullptr; long value_count = 0; if (!file_convert.ObservationGetSeriesData(observation_handle, i_channel, i_series, &values, &value_count) || values == nullptr || value_count <= 0) { delete[] values; continue; } RawSeriesTagMeta series_meta; series_meta.quantity_units_id = sr_info.quantityUnitsId; series_meta.quantity_characteristic_id = sr_info.quantityCharacteristicId; series_meta.value_type_id = sr_info.valueTypeId; series_meta.series_base_type = sr_info.seriesBaseType; series_meta.series_scale = sr_info.scale; series_meta.series_offset = sr_info.offset; // 按不同 valueType 保存到不同桶 if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_TIME)) { raw_series.time_meta = series_meta; for (long i = 0; i < value_count; ++i) { raw_series.times.push_back( observation_start_ts + static_cast(std::llround(values[i])) ); } } else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_MAX)) { raw_series.max_meta = series_meta; raw_series.max_values.insert(raw_series.max_values.end(), values, values + value_count); } else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_MIN)) { raw_series.min_meta = series_meta; raw_series.min_values.insert(raw_series.min_values.end(), values, values + value_count); } else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_AVG)) { raw_series.avg_meta = series_meta; raw_series.avg_values.insert(raw_series.avg_values.end(), values, values + value_count); } else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_P95)) { raw_series.cp95_meta = series_meta; raw_series.cp95_values.insert(raw_series.cp95_values.end(), values, values + value_count); } else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_VAL)) { raw_series.val_meta = series_meta; raw_series.val_values.insert(raw_series.val_values.end(), values, values + value_count); } delete[] values; } } file_convert.RecordReleaseObservation(observation_handle); } file_convert.Close(); return true; } // ============================ // 单文件处理 // ============================ bool process_single_pqdif_file(const fs::path& file_path, const std::string& mac) { RawChannelMap raw_map; std::string err; if (!parse_pqdif_file_raw(file_path.string(), raw_map, err)) { std::cout << "[PQDIF] parse failed: " << file_path.string() << " reason=" << err << std::endl; return false; } // 调试:打印本次解析到的暂存内容 dump_parsed_map_summary(file_path.string(), raw_map); ParsedPqdifFile parsed_file; parsed_file.mac = mac; parsed_file.source_file = file_path.string(); parsed_file.parsed_at = std::time(nullptr); parsed_file.channels = std::move(raw_map); if (!push_parsed_result_to_cache(std::move(parsed_file))) { std::cout << "[PQDIF] push cache failed: " << file_path.string() << std::endl; return false; } std::cout << "[PQDIF] processed ok: " << file_path.string() << " channels=" << GetParsedPqdifCacheSize() << std::endl; //在此处处理文件数据 return true; } // ============================ // 扫描目录 // ============================ void scan_once() { ensure_dir(kPqdRootDir); ensure_dir(kDoneRootDir); ensure_dir(kFailRootDir); std::error_code ec; if (!fs::exists(kPqdRootDir, ec) || !fs::is_directory(kPqdRootDir, ec)) return; // 第一层目录:按 mac 划分 for (fs::directory_iterator mac_it(kPqdRootDir, ec), mac_end; !ec && mac_it != mac_end; mac_it.increment(ec)) { if (ec || !fs::is_directory(mac_it->path())) continue; const std::string mac = mac_it->path().filename().string(); std::vector pqdif_files; std::error_code file_ec; for (fs::directory_iterator file_it(mac_it->path(), file_ec), file_end; !file_ec && file_it != file_end; file_it.increment(file_ec)) { if (!file_ec && is_pqdif_file(file_it->path())) pqdif_files.push_back(file_it->path()); } if (pqdif_files.empty()) continue; // 优先处理最新文件 std::sort(pqdif_files.begin(), pqdif_files.end(), [](const fs::path& a, const fs::path& b) { std::error_code ea, eb; return fs::last_write_time(a, ea) > fs::last_write_time(b, eb); }); for (const auto& file_path : pqdif_files) { const bool ok = process_single_pqdif_file(file_path, mac); const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir); const fs::path dst = target_root / mac / file_path.filename(); //调试时暂时关闭文件转移 /*if (!move_file_with_fallback(file_path, dst)) { std::cout << "[PQDIF] move failed: " << file_path.string() << " -> " << dst.string() << std::endl; }*/ } cleanup_backup_dir(fs::path(kDoneRootDir) / mac, kBackupLimit); cleanup_backup_dir(fs::path(kFailRootDir) / mac, kBackupLimit); } } } // namespace // ============================ // 对外缓存接口 // ============================ bool PopOldestParsedPqdifFile(ParsedPqdifFile& out) { std::lock_guard guard(g_parsed_cache_mutex); if (g_parsed_cache.empty()) return false; out = std::move(g_parsed_cache.front()); g_parsed_cache.pop_front(); return true; } bool PeekOldestParsedPqdifFile(ParsedPqdifFile& out) { std::lock_guard guard(g_parsed_cache_mutex); if (g_parsed_cache.empty()) return false; out = g_parsed_cache.front(); return true; } size_t GetParsedPqdifCacheSize() { std::lock_guard guard(g_parsed_cache_mutex); return g_parsed_cache.size(); } void ClearParsedPqdifCache() { std::lock_guard guard(g_parsed_cache_mutex); g_parsed_cache.clear(); } // ============================ // 线程主循环 // ============================ void RunPqdifScanLoop() { std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir << ", interval=" << kScanIntervalSec << "s" << std::endl; while (true) { try { scan_once(); } catch (const std::exception& ex) { std::cout << "[PQDIF] scan exception: " << ex.what() << std::endl; } catch (...) { std::cout << "[PQDIF] scan exception: unknown" << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(kScanIntervalSec)); } }