Files
front_linux/LFtid1056/pqdif_thread_processor.cpp

726 lines
23 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "pqdif_thread_processor.h"
#include <algorithm>
#include <chrono>
#include <thread>
#include <cctype>
#include <cmath>
#include <ctime>
#include <deque>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <vector>
#include <experimental/filesystem>
#include <fstream>
#include <iomanip>
#include <cstdio>
// PQDIF 解析库
#include "pqdif/PQDIF.h"
#include "pqdif/include/pqdif_ph.h"
#include "pqdif/include/pqdif_id.h"
namespace fs = std::experimental::filesystem;
namespace {
// ============================
// 可配置参数
// ============================
constexpr int kScanIntervalSec = 60; // 扫描目录间隔
constexpr int kBackupLimit = 4800; // 成功/失败目录最大保留文件数
constexpr size_t kParsedCacheLimit = 128; // 内存缓存最大数量
const char* kPqdRootDir = "download"; // 输入目录download/<mac>/*.pqd
const char* kDoneRootDir = "download_done";
const char* kFailRootDir = "download_fail";
// ============================
// 内存缓存
// ============================
std::deque<ParsedPqdifFile> g_parsed_cache;
std::mutex g_parsed_cache_mutex;
// ============================
// 基础工具函数
// ============================
// ============================
// 调试打印辅助函数
// 作用:把暂存到内存里的 PQDIF 内容打印出来
// ============================
// GUID 转字符串,便于查看标签值
std::string guid_to_string(const GUID& g)
{
char buf[64] = { 0 };
std::snprintf(buf, sizeof(buf),
"%08x-%04x-%04x-%02x%02x-%02x%02x%02x%02x%02x%02x",
static_cast<unsigned int>(g.Data1),
static_cast<unsigned int>(g.Data2),
static_cast<unsigned int>(g.Data3),
static_cast<unsigned int>(g.Data4[0]),
static_cast<unsigned int>(g.Data4[1]),
static_cast<unsigned int>(g.Data4[2]),
static_cast<unsigned int>(g.Data4[3]),
static_cast<unsigned int>(g.Data4[4]),
static_cast<unsigned int>(g.Data4[5]),
static_cast<unsigned int>(g.Data4[6]),
static_cast<unsigned int>(g.Data4[7]));
return std::string(buf);
}
// 打印数值数组前几个样本,避免日志过长
void print_value_preview(const std::vector<double>& values, const char* title, size_t preview_count = 5)
{
if (values.empty())
return;
std::cout << " " << title << " count=" << values.size() << " preview=[";
const size_t n = std::min(values.size(), preview_count);
for (size_t i = 0; i < n; ++i)
{
if (i > 0)
std::cout << ", ";
std::cout << values[i];
}
if (values.size() > preview_count)
std::cout << ", ...";
std::cout << "]" << std::endl;
}
// 打印时间数组前几个样本
void print_time_preview(const std::vector<time_t>& values, const char* title, size_t preview_count = 5)
{
if (values.empty())
return;
std::cout << " " << title << " count=" << values.size() << " preview=[";
const size_t n = std::min(values.size(), preview_count);
for (size_t i = 0; i < n; ++i)
{
if (i > 0)
std::cout << ", ";
std::cout << static_cast<long long>(values[i]);
}
if (values.size() > preview_count)
std::cout << ", ...";
std::cout << "]" << std::endl;
}
// 打印某一类 series 的标签
void print_series_meta(const RawSeriesTagMeta& meta, const char* name)
{
std::cout << " [" << name << "]"
<< " units_id=" << meta.quantity_units_id
<< " characteristic_id=" << guid_to_string(meta.quantity_characteristic_id)
<< " value_type_id=" << guid_to_string(meta.value_type_id)
<< " base_type=" << meta.series_base_type
<< " scale=" << meta.series_scale
<< " offset=" << meta.series_offset
<< std::endl;
}
// 打印单个通道的内容
void dump_channel_detail(const std::string& key, const RawChannelSeries& ch)
{
std::cout << " [CHANNEL]"
<< " key=" << key
<< " raw_name=" << ch.channel_tag.raw_channel_name
<< " phase_id=" << ch.channel_tag.phase_id
<< " quantity_type_id=" << guid_to_string(ch.channel_tag.quantity_type_id)
<< " quantity_measured_id=" << ch.channel_tag.quantity_measured_id
<< " freq=" << ch.channel_tag.channel_frequency
<< " group_id=" << ch.channel_tag.group_id
<< std::endl;
if (!ch.times.empty())
{
print_series_meta(ch.time_meta, "TIME");
print_time_preview(ch.times, "TIME");
}
if (!ch.max_values.empty())
{
print_series_meta(ch.max_meta, "MAX");
print_value_preview(ch.max_values, "MAX");
}
if (!ch.min_values.empty())
{
print_series_meta(ch.min_meta, "MIN");
print_value_preview(ch.min_values, "MIN");
}
if (!ch.avg_values.empty())
{
print_series_meta(ch.avg_meta, "AVG");
print_value_preview(ch.avg_values, "AVG");
}
if (!ch.cp95_values.empty())
{
print_series_meta(ch.cp95_meta, "CP95");
print_value_preview(ch.cp95_values, "CP95");
}
if (!ch.val_values.empty())
{
print_series_meta(ch.val_meta, "VAL");
print_value_preview(ch.val_values, "VAL");
}
}
// 打印整个文件的暂存内容摘要
void dump_parsed_map_summary(const std::string& file_path, const RawChannelMap& raw_map)
{
std::cout << "========== PQDIF PARSED SUMMARY ==========" << std::endl;
std::cout << "file=" << file_path
<< ", channel_count=" << raw_map.size()
<< std::endl;
for (const auto& kv : raw_map)
{
dump_channel_detail(kv.first, kv.second);
}
std::cout << "==========================================" << std::endl;
}
bool dump_file_basic_info(const std::string& file_path, std::string& err)
{
std::error_code ec;
fs::path p(file_path);
if (!fs::exists(p, ec))
{
err = "file not exists";
return false;
}
if (!fs::is_regular_file(p, ec))
{
err = "not a regular file";
return false;
}
auto file_size = fs::file_size(p, ec);
if (ec)
{
err = "cannot get file size";
return false;
}
std::ifstream ifs(file_path, std::ios::binary);
if (!ifs.is_open())
{
err = "ifstream open failed";
return false;
}
std::cout << "[PQDIF] file basic info: path=" << file_path
<< ", size=" << file_size << std::endl;
// 打印前 32 字节十六进制,便于判断是否像标准文件头
unsigned char buf[32] = { 0 };
ifs.read(reinterpret_cast<char*>(buf), sizeof(buf));
std::streamsize n = ifs.gcount();
std::ostringstream oss;
oss << "[PQDIF] first " << n << " bytes: ";
for (std::streamsize i = 0; i < n; ++i)
{
oss << std::hex << std::setw(2) << std::setfill('0')
<< static_cast<int>(buf[i]) << " ";
}
std::cout << oss.str() << std::endl;
return true;
}
std::string to_upper_copy(std::string s)
{
std::transform(s.begin(), s.end(), s.begin(), [](unsigned char c) {
return static_cast<char>(std::toupper(c));
});
return s;
}
std::string trim_copy(const std::string& s)
{
size_t beg = 0;
while (beg < s.size() && std::isspace(static_cast<unsigned char>(s[beg])))
++beg;
size_t end = s.size();
while (end > beg && std::isspace(static_cast<unsigned char>(s[end - 1])))
--end;
return s.substr(beg, end - beg);
}
// 规范化通道名
// 注意:当前阶段只用于 map key不作为识别依据
std::string normalize_key(const std::string& src)
{
std::string out;
out.reserve(src.size());
for (char c : src)
{
unsigned char uc = static_cast<unsigned char>(c);
if (std::isalnum(uc) || c == '[' || c == ']')
out.push_back(static_cast<char>(std::toupper(uc)));
}
return out;
}
bool is_pqdif_file(const fs::path& path)
{
if (!fs::is_regular_file(path))
return false;
const std::string ext = to_upper_copy(path.extension().string());
return ext == ".PQD" || ext == ".PQDIF";
}
bool ensure_dir(const fs::path& dir)
{
std::error_code ec;
if (fs::exists(dir, ec))
return true;
return fs::create_directories(dir, ec) || fs::exists(dir, ec);
}
bool move_file_with_fallback(const fs::path& src, const fs::path& dst)
{
std::error_code ec;
ensure_dir(dst.parent_path());
fs::rename(src, dst, ec);
if (!ec)
return true;
ec.clear();
fs::copy_file(src, dst, fs::copy_options::overwrite_existing, ec);
if (ec)
return false;
ec.clear();
fs::remove(src, ec);
return !ec;
}
void cleanup_backup_dir(const fs::path& dir, int limit)
{
if (limit < 0)
return;
std::error_code ec;
if (!fs::exists(dir, ec) || !fs::is_directory(dir, ec))
return;
std::vector<fs::path> files;
for (fs::directory_iterator it(dir, ec), end; !ec && it != end; it.increment(ec))
{
if (!ec && is_pqdif_file(it->path()))
files.push_back(it->path());
}
if (files.size() <= static_cast<size_t>(limit))
return;
std::sort(files.begin(), files.end(), [](const fs::path& a, const fs::path& b) {
std::error_code ea, eb;
return fs::last_write_time(a, ea) < fs::last_write_time(b, eb);
});
const size_t remove_count = files.size() - static_cast<size_t>(limit);
for (size_t i = 0; i < remove_count; ++i)
{
std::error_code remove_ec;
fs::remove(files[i], remove_ec);
}
}
// 生成稳定 key只用于存储
// 后续真正指标识别不要依赖这个
std::string build_channel_key(const std::string& channel_name, double channel_freq, int group_id)
{
std::ostringstream oss;
oss << trim_copy(channel_name);
if (channel_freq > 0.0)
{
const int harmonic_no = static_cast<int>(std::llround(channel_freq / 50.0));
if (harmonic_no > 0)
oss << "[" << harmonic_no << "]";
}
else if (group_id > 0)
{
oss << "[" << group_id << "]";
}
return normalize_key(oss.str());
}
// ============================
// 缓存入队
// ============================
bool push_parsed_result_to_cache(ParsedPqdifFile&& parsed)
{
std::lock_guard<std::mutex> guard(g_parsed_cache_mutex);
if (g_parsed_cache.size() >= kParsedCacheLimit)
{
std::cout << "[PQDIF] cache full, drop oldest: "
<< g_parsed_cache.front().source_file << std::endl;
g_parsed_cache.pop_front();
}
g_parsed_cache.emplace_back(std::move(parsed));
return true;
}
// ============================
// PQDIF 原始解析
// 当前阶段:只保存标签 + 原始值
// ============================
bool parse_pqdif_file_raw(const std::string& file_path, RawChannelMap& out_map, std::string& err)
{
CPQDIF file_convert;
file_convert.put_FlatFileName(file_path);
std::string basic_err;
if (!dump_file_basic_info(file_path, basic_err))
{
err = "precheck failed: " + basic_err;
return false;
}
if (!file_convert.Read())
{
err = "CPQDIF::Read() failed";
return false;
}
const int record_count = static_cast<int>(file_convert.RecordGetCount());
for (int i_record = 0; i_record < record_count; ++i_record)
{
GUID record_guid{};
std::string record_name;
if (!file_convert.RecordGetInfo(i_record, &record_guid, record_name))
continue;
// 当前阶段只处理 Observation Record
if (!PQDIF_IsEqualGUID(record_guid, tagRecObservation))
continue;
long observation_handle = 0;
if (!file_convert.RecordRequestObservation(i_record, &observation_handle))
continue;
DATE time_start = 0;
std::string observation_name;
long channel_count = 0;
if (!file_convert.ObservationGetInfo(observation_handle, time_start, observation_name, channel_count))
{
file_convert.RecordReleaseObservation(observation_handle);
continue;
}
// 当前线程只处理统计型 Observation不处理录波型 Observation
long trigger_method_id = 0;
DATE trigger_time = 0;
file_convert.ObservationGetTriggerInfo(observation_handle, &trigger_method_id, &trigger_time);
if (trigger_method_id == ID_TRIGGER_METH_CHANNEL || trigger_method_id == -1)
{
file_convert.RecordReleaseObservation(observation_handle);
continue;
}
time_t observation_start_ts = 0;
file_convert.GetTime(time_start, &observation_start_ts);
// 遍历通道
for (int i_channel = 0; i_channel < channel_count; ++i_channel)
{
PqdifChannelInfoEx ch_info;
if (!file_convert.ObservationGetChannelInfoEx(observation_handle, i_channel, &ch_info))
continue;
if (ch_info.name.empty())
continue;
double channel_freq = 0.0;
int group_id = 0;
file_convert.ObservationGetChannelFreq(observation_handle, i_channel, &channel_freq);
file_convert.ObservationGetChannelGroupID(observation_handle, i_channel, &group_id);
const std::string key = build_channel_key(ch_info.name, channel_freq, group_id);
RawChannelSeries& raw_series = out_map[key];
// 保存通道级标签
raw_series.channel_tag.raw_channel_name = ch_info.name;
raw_series.channel_tag.normalized_channel_name = key;
raw_series.channel_tag.phase_id = ch_info.phaseId;
raw_series.channel_tag.quantity_type_id = ch_info.quantityTypeId;
raw_series.channel_tag.quantity_measured_id = ch_info.quantityMeasuredId;
raw_series.channel_tag.channel_frequency = channel_freq;
raw_series.channel_tag.group_id = group_id;
// 遍历该通道下的所有 series
for (int i_series = 0; i_series < ch_info.countSeries; ++i_series)
{
PqdifSeriesInfoEx sr_info;
if (!file_convert.ObservationGetSeriesInfoEx(observation_handle, i_channel, i_series, &sr_info))
continue;
double* values = nullptr;
long value_count = 0;
if (!file_convert.ObservationGetSeriesData(observation_handle, i_channel, i_series, &values, &value_count) ||
values == nullptr || value_count <= 0)
{
delete[] values;
continue;
}
RawSeriesTagMeta series_meta;
series_meta.quantity_units_id = sr_info.quantityUnitsId;
series_meta.quantity_characteristic_id = sr_info.quantityCharacteristicId;
series_meta.value_type_id = sr_info.valueTypeId;
series_meta.series_base_type = sr_info.seriesBaseType;
series_meta.series_scale = sr_info.scale;
series_meta.series_offset = sr_info.offset;
// 按不同 valueType 保存到不同桶
if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_TIME))
{
raw_series.time_meta = series_meta;
for (long i = 0; i < value_count; ++i)
{
raw_series.times.push_back(
observation_start_ts + static_cast<time_t>(std::llround(values[i]))
);
}
}
else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_MAX))
{
raw_series.max_meta = series_meta;
raw_series.max_values.insert(raw_series.max_values.end(), values, values + value_count);
}
else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_MIN))
{
raw_series.min_meta = series_meta;
raw_series.min_values.insert(raw_series.min_values.end(), values, values + value_count);
}
else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_AVG))
{
raw_series.avg_meta = series_meta;
raw_series.avg_values.insert(raw_series.avg_values.end(), values, values + value_count);
}
else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_P95))
{
raw_series.cp95_meta = series_meta;
raw_series.cp95_values.insert(raw_series.cp95_values.end(), values, values + value_count);
}
else if (PQDIF_IsEqualGUID(sr_info.valueTypeId, ID_SERIES_VALUE_TYPE_VAL))
{
raw_series.val_meta = series_meta;
raw_series.val_values.insert(raw_series.val_values.end(), values, values + value_count);
}
delete[] values;
}
}
file_convert.RecordReleaseObservation(observation_handle);
}
file_convert.Close();
return true;
}
// ============================
// 单文件处理
// ============================
bool process_single_pqdif_file(const fs::path& file_path, const std::string& mac)
{
RawChannelMap raw_map;
std::string err;
if (!parse_pqdif_file_raw(file_path.string(), raw_map, err))
{
std::cout << "[PQDIF] parse failed: " << file_path.string()
<< " reason=" << err << std::endl;
return false;
}
// 调试:打印本次解析到的暂存内容
dump_parsed_map_summary(file_path.string(), raw_map);
ParsedPqdifFile parsed_file;
parsed_file.mac = mac;
parsed_file.source_file = file_path.string();
parsed_file.parsed_at = std::time(nullptr);
parsed_file.channels = std::move(raw_map);
if (!push_parsed_result_to_cache(std::move(parsed_file)))
{
std::cout << "[PQDIF] push cache failed: " << file_path.string() << std::endl;
return false;
}
std::cout << "[PQDIF] processed ok: " << file_path.string()
<< " channels=" << GetParsedPqdifCacheSize()
<< std::endl;
//在此处处理文件数据
return true;
}
// ============================
// 扫描目录
// ============================
void scan_once()
{
ensure_dir(kPqdRootDir);
ensure_dir(kDoneRootDir);
ensure_dir(kFailRootDir);
std::error_code ec;
if (!fs::exists(kPqdRootDir, ec) || !fs::is_directory(kPqdRootDir, ec))
return;
// 第一层目录:按 mac 划分
for (fs::directory_iterator mac_it(kPqdRootDir, ec), mac_end; !ec && mac_it != mac_end; mac_it.increment(ec))
{
if (ec || !fs::is_directory(mac_it->path()))
continue;
const std::string mac = mac_it->path().filename().string();
std::vector<fs::path> pqdif_files;
std::error_code file_ec;
for (fs::directory_iterator file_it(mac_it->path(), file_ec), file_end; !file_ec && file_it != file_end; file_it.increment(file_ec))
{
if (!file_ec && is_pqdif_file(file_it->path()))
pqdif_files.push_back(file_it->path());
}
if (pqdif_files.empty())
continue;
// 优先处理最新文件
std::sort(pqdif_files.begin(), pqdif_files.end(), [](const fs::path& a, const fs::path& b) {
std::error_code ea, eb;
return fs::last_write_time(a, ea) > fs::last_write_time(b, eb);
});
for (const auto& file_path : pqdif_files)
{
const bool ok = process_single_pqdif_file(file_path, mac);
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
const fs::path dst = target_root / mac / file_path.filename();
//调试时暂时关闭文件转移
/*if (!move_file_with_fallback(file_path, dst))
{
std::cout << "[PQDIF] move failed: "
<< file_path.string() << " -> " << dst.string()
<< std::endl;
}*/
}
cleanup_backup_dir(fs::path(kDoneRootDir) / mac, kBackupLimit);
cleanup_backup_dir(fs::path(kFailRootDir) / mac, kBackupLimit);
}
}
} // namespace
// ============================
// 对外缓存接口
// ============================
bool PopOldestParsedPqdifFile(ParsedPqdifFile& out)
{
std::lock_guard<std::mutex> guard(g_parsed_cache_mutex);
if (g_parsed_cache.empty())
return false;
out = std::move(g_parsed_cache.front());
g_parsed_cache.pop_front();
return true;
}
bool PeekOldestParsedPqdifFile(ParsedPqdifFile& out)
{
std::lock_guard<std::mutex> guard(g_parsed_cache_mutex);
if (g_parsed_cache.empty())
return false;
out = g_parsed_cache.front();
return true;
}
size_t GetParsedPqdifCacheSize()
{
std::lock_guard<std::mutex> guard(g_parsed_cache_mutex);
return g_parsed_cache.size();
}
void ClearParsedPqdifCache()
{
std::lock_guard<std::mutex> guard(g_parsed_cache_mutex);
g_parsed_cache.clear();
}
// ============================
// 线程主循环
// ============================
void RunPqdifScanLoop()
{
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
<< ", interval=" << kScanIntervalSec << "s"
<< std::endl;
while (true)
{
try
{
scan_once();
}
catch (const std::exception& ex)
{
std::cout << "[PQDIF] scan exception: " << ex.what() << std::endl;
}
catch (...)
{
std::cout << "[PQDIF] scan exception: unknown" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(kScanIntervalSec));
}
}