From f925356d6329167a97c769b100e2c32ea409238f Mon Sep 17 00:00:00 2001 From: zhangwen <3466561528@qq.com> Date: Thu, 28 May 2026 14:38:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BA=86=E5=A4=A7=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=A4=84=E7=90=86=EF=BC=8C=E9=98=B2=E6=AD=A2=E5=A4=A7?= =?UTF-8?q?=E9=87=8Fpqdif=E6=96=87=E4=BB=B6=E5=AF=BC=E8=87=B4=E5=86=85?= =?UTF-8?q?=E5=AD=98=E5=BC=82=E5=B8=B8=E3=80=82=E5=8F=A6=E5=A4=96=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E4=BA=86=E8=A7=A3=E6=9E=90=E5=86=85=E5=AD=98=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E6=97=B6=EF=BC=8C=E5=B0=9D=E8=AF=95=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E5=8E=9F=E6=96=87=E4=BB=B6=EF=BC=8C=E9=98=B2=E6=AD=A2=E4=B8=80?= =?UTF-8?q?=E7=9B=B4=E6=BA=A2=E5=87=BA=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LFtid1056/pqdif_thread_processor.cpp | 382 ++++++++++++++++++++++++++- 1 file changed, 377 insertions(+), 5 deletions(-) diff --git a/LFtid1056/pqdif_thread_processor.cpp b/LFtid1056/pqdif_thread_processor.cpp index 254bc1b..7e95e14 100644 --- a/LFtid1056/pqdif_thread_processor.cpp +++ b/LFtid1056/pqdif_thread_processor.cpp @@ -20,6 +20,7 @@ #include #include #include +#include // PQDIF 解析库 #include "pqdif/PQDIF.h" @@ -37,6 +38,10 @@ namespace { constexpr int kMaxPqdifFilesPerScan = 1; constexpr size_t kParsedCacheLimit = 128; + // 大文件流式阈值:估算展开点数超过该值时,不再构造 expanded_stat_points, + // 而是按通道逐步聚合成时间桶并直接组装 Base64,避免单文件中间对象占用过大内存。 + constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000; + const char* kPqdRootDir = "download"; const char* kDoneRootDir = "download_done"; const char* kFailRootDir = "download_fail"; @@ -5072,6 +5077,250 @@ namespace { return stat_select_best_metric_sources(out); } + + size_t stat_estimate_candidate_point_count_for_streaming(const PqdifLogicalFile& lf) + { + size_t total = 0; + for (const auto& obs : lf.observations) + { + for (const auto& ch : obs.channel_instances) + { + if (!pqdif_sem::IsQuantityTypeValueLog(ch.quantity_type_id.value)) + continue; + + for (const auto& si : ch.series_instances) + { + if (pqdif_sem::IsValueTypeTime(si.value_type_id.value)) + continue; + + const PqdifSeriesInstance* resolved = stat_resolve_shared_series(obs, si); + if (resolved == nullptr || resolved->values.count <= 0) + continue; + + total += static_cast(resolved->values.count); + if (total > kPqdifLargeFileStreamingPointThreshold) + return total; + } + } + } + return total; + } + + struct StatStreamingSourceBucket + { + StatMetricId metric_id = StatMetricId::Unknown; + StatMetricSourceKey key; + StatMetricSourceStats stats; + std::map values_by_time; + std::map time_text_by_time; + }; + + void stat_streaming_set_value(AggregatedStatValues& agg, const ExpandedStatPoint& p) + { + if (agg.source_observation_index < 0) + { + agg.source_observation_index = p.observation_index; + agg.source_channel_instance_index = p.channel_instance_index; + agg.source_series_instance_index = p.series_instance_index; + agg.source_channel_name = p.channel_name; + agg.quality = StatMetricQuality::Normal; + agg.quality_reason = "ok"; + } + + // 同一来源同一时间同一种统计值如果重复,保留首次写入,避免一次性覆盖造成结果不稳定。 + if (stat_has_value_kind(agg, p.stat_kind)) + return; + + agg.source_series_instance_index = p.series_instance_index; + switch (p.stat_kind) + { + case StatValueKind::Min: + agg.has_min = true; + agg.min_value = p.value; + break; + case StatValueKind::Max: + agg.has_max = true; + agg.max_value = p.value; + break; + case StatValueKind::Avg: + agg.has_avg = true; + agg.avg_value = p.value; + break; + case StatValueKind::P95: + agg.has_p95 = true; + agg.p95_value = p.value; + break; + default: + break; + } + } + + void stat_streaming_add_point( + std::map>& by_metric, + const ExpandedStatPoint& p) + { + if (p.metric_id == StatMetricId::Unknown || p.stat_kind == StatValueKind::Unknown) + return; + + const StatMetricSourceKey key = stat_make_source_key(p); + StatStreamingSourceBucket& src = by_metric[p.metric_id][key]; + if (src.metric_id == StatMetricId::Unknown) + { + src.metric_id = p.metric_id; + src.key = key; + } + + src.stats.add(p); + AggregatedStatValues& agg = src.values_by_time[p.timestamp]; + stat_streaming_set_value(agg, p); + if (src.time_text_by_time.find(p.timestamp) == src.time_text_by_time.end()) + src.time_text_by_time[p.timestamp] = p.timestamp_text; + } + + std::vector stat_build_aggregated_buckets_streaming( + const PqdifLogicalFile& lf, + ParsedConnectionKind connection_kind, + int& selected_observation_index, + std::string& selected_observation_name, + size_t& out_raw_point_count, + size_t& out_selected_metric_count, + size_t& out_selected_dynamic_metric_count) + { + selected_observation_index = -1; + selected_observation_name.clear(); + out_raw_point_count = 0; + out_selected_metric_count = 0; + out_selected_dynamic_metric_count = 0; + + const PqdifObservationRecord* primary = stat_select_primary_statistical_observation(lf); + if (primary != nullptr) + { + selected_observation_index = primary->observation_index; + selected_observation_name = primary->observation_name; + } + + std::map> by_metric; + + // 大文件模式:遍历全部 observations,但每次只临时展开一个通道,随后立刻压缩到 + // metric/source/timestamp/kind 聚合结构中,不保留全量 ExpandedStatPoint。 + for (const auto& obs : lf.observations) + { + for (const auto& ch : obs.channel_instances) + { + std::vector points = stat_expand_channel_points(lf, connection_kind, obs, ch); + out_raw_point_count += points.size(); + for (const auto& p : points) + stat_streaming_add_point(by_metric, p); + } + } + + std::map global_buckets; + size_t duplicate_metric_count = 0; + + for (auto& metric_pair : by_metric) + { + const StatMetricId metric_id = metric_pair.first; + auto& sources = metric_pair.second; + if (sources.empty()) + continue; + + bool has_selected = false; + StatMetricSourceKey best_key; + int best_score = std::numeric_limits::min(); + + for (auto& src_pair : sources) + { + int score = stat_metric_source_score(src_pair.second.stats); + // 大文件流式模式会遍历全部 observations。为尽量保持原逻辑,普通核心指标在分数接近时 + // 优先使用主统计 observation;谐波等只存在于其他 observation 时仍能正常选中。 + if (selected_observation_index >= 0 && src_pair.first.observation_index == selected_observation_index) + score += 25; + + if (!has_selected || score > best_score) + { + has_selected = true; + best_score = score; + best_key = src_pair.first; + } + } + + if (!has_selected) + continue; + + ++out_selected_metric_count; + if (stat_is_dynamic_metric(metric_id)) + ++out_selected_dynamic_metric_count; + if (sources.size() > 1) + ++duplicate_metric_count; + + StatStreamingSourceBucket& selected_src = sources[best_key]; + for (auto& time_pair : selected_src.values_by_time) + { + TimeAggregatedStatBucket& bucket = global_buckets[time_pair.first]; + if (bucket.timestamp == 0) + { + bucket.timestamp = time_pair.first; + const auto tit = selected_src.time_text_by_time.find(time_pair.first); + bucket.timestamp_text = (tit == selected_src.time_text_by_time.end()) ? format_time_text(time_pair.first) : tit->second; + } + bucket.metrics[metric_id] = time_pair.second; + } + } + + std::vector out; + out.reserve(global_buckets.size()); + for (auto& kv : global_buckets) + out.push_back(std::move(kv.second)); + + if (pqdif_log_enabled(PqdifLogLevel::Core)) + { + std::cout << "========== PQDIF LARGE FILE STREAMING SUMMARY ==========" << std::endl; + std::cout << "raw_points=" << out_raw_point_count + << ", candidate_metric_count=" << by_metric.size() + << ", selected_metric_count=" << out_selected_metric_count + << ", selected_dynamic_metrics=" << out_selected_dynamic_metric_count << "/" << stat_all_dynamic_metric_order().size() + << ", duplicate_metric_count=" << duplicate_metric_count + << ", buckets=" << out.size() + << ", selected_observation_index=" << selected_observation_index + << ", selected_observation_name=" << selected_observation_name + << std::endl; + std::cout << "mode=streaming_no_expanded_points; detail=per-channel expand then immediate aggregate" << std::endl; + std::cout << "========================================================" << std::endl; + } + + return out; + } + + void stat_print_aggregated_metric_line(StatMetricId metric_id, const AggregatedStatValues* agg); + void dump_grouped_bucket_streaming_preview(const ParsedPqdifFile& parsed_file) + { + std::cout << "========== GROUPED STAT STREAMING CORE SUMMARY ==========" << std::endl; + std::cout << "connection_kind=" << stat_connection_kind_name(parsed_file.connection_kind) + << ", selected_observation_index=" << parsed_file.selected_observation_index + << ", selected_observation_name=" << parsed_file.selected_observation_name + << ", expanded_points=SKIPPED_BY_STREAMING" + << ", buckets=" << parsed_file.aggregated_stat_buckets.size() + << ", core_metric_slots=" << stat_core_metric_print_order().size() + << ", dynamic_spectrum_slots=" << stat_all_dynamic_metric_order().size() + << std::endl; + + const size_t bucket_limit = std::min(parsed_file.aggregated_stat_buckets.size(), 3); + for (size_t i = 0; i < bucket_limit; ++i) + { + const auto& b = parsed_file.aggregated_stat_buckets[i]; + std::cout << " [BUCKET " << i << "] time=" << b.timestamp_text + << ", metric_count_present=" << b.metrics.size() + << std::endl; + for (const auto metric_id : stat_core_metric_print_order()) + { + const auto it = b.metrics.find(metric_id); + if (it != b.metrics.end()) + stat_print_aggregated_metric_line(metric_id, &it->second); + } + } + std::cout << "=========================================================" << std::endl; + } + bool pqdif_probe_text_looks_like_flicker(const std::string& text) { const std::string key = normalize_key(text); @@ -7496,9 +7745,41 @@ namespace { << ", total_base64_chars=" << file_batch.total_base64_chars << std::endl; - // 按你的要求,入队前完整打印保存对象内部结构:文件批次 -> 时间点 -> Max/Min/Avg/P95 子记录 -> Base64 内容。 - // 日志会很长;确认完成后可以临时注释掉这一行,或改成 if (pqdif_log_enabled(PqdifLogLevel::Info)) 包裹。 - //pqdif_dump_stat_base64_file_batch_full(file_batch); + // 完整 Base64 内容日志非常长,尤其大文件会明显放大内存和 I/O 压力。 + // 现在仅在 Debug/Trace 级别打印完整对象;Core/Info 只打印摘要和前几条子记录。 + if (pqdif_log_enabled(PqdifLogLevel::Debug)) + { + pqdif_dump_stat_base64_file_batch_full(file_batch); + } + else + { + std::cout << " [BASE64 FILE BATCH COMPACT] file=" << file_batch.pqdif_file_path + << ", time_points=" << file_batch.time_point_count + << ", records=" << file_batch.total_record_count + << ", total_float_count=" << file_batch.total_float_count + << ", total_placeholder_count=" << file_batch.total_placeholder_count + << ", total_base64_chars=" << file_batch.total_base64_chars + << std::endl; + + size_t shown = 0; + for (const auto& tp : file_batch.time_points) + { + for (const auto& rec : tp.records) + { + if (shown >= 4) + break; + std::cout << " [BASE64 SAVED SAMPLE] time=" << rec.timestamp_text + << ", kind=" << rec.value_kind_name + << ", floats=" << rec.float_count + << ", placeholders=" << rec.placeholder_count + << ", base64_len=" << rec.base64_payload.size() + << std::endl; + ++shown; + } + if (shown >= 4) + break; + } + } push_pqdif_stat_base64_file_batch(std::move(file_batch)); @@ -7548,6 +7829,54 @@ namespace { // 后续指标判断需要先区分星型 / 角型两套规则。 parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file); + // 2) 根据估算展开点数决定是否启用“大文件流式模式”。 + // 普通文件仍走原 expanded_stat_points -> grouped buckets 流程; + // 大文件则直接按通道聚合为 buckets,不再保存全量 expanded_stat_points。 + const size_t estimated_candidate_points = + stat_estimate_candidate_point_count_for_streaming(parsed_file.logical_file); + const bool use_large_file_streaming = + estimated_candidate_points > kPqdifLargeFileStreamingPointThreshold; + + if (use_large_file_streaming) + { + std::cout << "[PQDIF][MEMORY][STREAMING] large file detected, use streaming bucket/base64 build. " + << "estimated_candidate_points=" << estimated_candidate_points + << ", threshold=" << kPqdifLargeFileStreamingPointThreshold + << std::endl; + + size_t raw_point_count = 0; + size_t selected_metric_count = 0; + size_t selected_dynamic_metric_count = 0; + parsed_file.aggregated_stat_buckets = stat_build_aggregated_buckets_streaming( + parsed_file.logical_file, + parsed_file.connection_kind, + parsed_file.selected_observation_index, + parsed_file.selected_observation_name, + raw_point_count, + selected_metric_count, + selected_dynamic_metric_count); + + if (pqdif_is_trace_log_enabled()) + dump_semantic_probe(parsed_file); + + dump_grouped_bucket_streaming_preview(parsed_file); + + // 3) 大文件仍完整生成 Base64 文件批次,只是跳过 expanded_stat_points 中间缓存。 + pqdif_build_and_queue_base64_records(parsed_file); + + // 大文件模式不再把完整 ParsedPqdifFile 放入解析缓存,避免 logical_file + buckets 在内存中长期驻留。 + // 后续业务请从 PqdifStatBase64FileBatch 队列读取最终 Base64 结果。 + std::cout << "[PQDIF][MEMORY][STREAMING] skip parsed_file cache for large file, " + << "base64 batch has been queued. file=" << file_path.string() + << std::endl; + + std::cout << "[PQDIF] processed ok(streaming): " << file_path.string() + << ", cache_size=" << GetParsedPqdifCacheSize() + << std::endl; + + return true; + } + // 2) 只筛选“统计类 observation”,并对其展开目标统计指标。 // 当前阶段不处理全部 observation,避免不同 observation 混入同一时间聚合结果。 parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation( @@ -7655,14 +7984,50 @@ namespace { << ", max_per_scan=" << kMaxPqdifFilesPerScan << std::endl; - const bool ok = process_single_pqdif_file(item.path, item.mac); + bool ok = false; + bool caught_exception = false; + std::string exception_text; + + try + { + ok = process_single_pqdif_file(item.path, item.mac); + } + catch (const std::bad_alloc& ex) + { + ok = false; + caught_exception = true; + exception_text = ex.what(); + std::cout << "[PQDIF][OOM] std::bad_alloc while processing file=" + << item.path.string() + << ", what=" << exception_text + << std::endl; + } + catch (const std::exception& ex) + { + ok = false; + caught_exception = true; + exception_text = ex.what(); + std::cout << "[PQDIF][ERROR] exception while processing file=" + << item.path.string() + << ", what=" << exception_text + << std::endl; + } + catch (...) + { + ok = false; + caught_exception = true; + exception_text = "unknown"; + std::cout << "[PQDIF][ERROR] unknown exception while processing file=" + << item.path.string() + << std::endl; + } const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir); const fs::path dst = target_root / item.mac / item.path.filename(); // 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。 // 解析成功:download//.pqd -> download_done//.pqd - // 解析失败:download//.pqd -> download_fail//.pqd + // 解析失败或解析过程中出现异常:download//.pqd -> download_fail//.pqd // // 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件, // 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。 @@ -7672,6 +8037,13 @@ namespace { << item.path.string() << " -> " << dst.string() << std::endl; } + else if (caught_exception) + { + std::cout << "[PQDIF] exception file moved to fail dir: " + << dst.string() + << ", reason=" << exception_text + << std::endl; + } cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit); cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);