Compare commits
11 Commits
ebefd6b2ae
...
测试2
| Author | SHA1 | Date | |
|---|---|---|---|
| 3191422869 | |||
| a91672a994 | |||
| e64d2e2318 | |||
| 671fc6702e | |||
| 626aac1fce | |||
| f925356d63 | |||
| 2161629fe0 | |||
| f879978e62 | |||
| 10a24450c7 | |||
| 845d2c551b | |||
| 54b0d68c24 |
BIN
LFtid1056.rar
Normal file
BIN
LFtid1056.rar
Normal file
Binary file not shown.
@@ -20,9 +20,10 @@ $SRC_DIR/tinyxml2.cpp \
|
|||||||
./dealMsg.cpp \
|
./dealMsg.cpp \
|
||||||
./main_thread.cpp \
|
./main_thread.cpp \
|
||||||
./PQSMsg.cpp \
|
./PQSMsg.cpp \
|
||||||
./pqdif_thread_processor.cpp \
|
|
||||||
./pqdif/PQDIF.cpp \
|
./pqdif/PQDIF.cpp \
|
||||||
./pqdif/include/cjson.c "
|
./pqdif_semantic_ids.cpp \
|
||||||
|
./pqdif_thread_processor.cpp \
|
||||||
|
./pqdif/include/cjson.c"
|
||||||
|
|
||||||
INCLUDE_DIRS="-I$SRC_DIR \
|
INCLUDE_DIRS="-I$SRC_DIR \
|
||||||
-I$SRC_DIR/nlohmann \
|
-I$SRC_DIR/nlohmann \
|
||||||
@@ -33,20 +34,22 @@ INCLUDE_DIRS="-I$SRC_DIR \
|
|||||||
-I./lib/libuv-v1.51.0/include \
|
-I./lib/libuv-v1.51.0/include \
|
||||||
-I./pqdif \
|
-I./pqdif \
|
||||||
-I./pqdif/include \
|
-I./pqdif/include \
|
||||||
-I. "
|
-I. "
|
||||||
|
|
||||||
LIB_DIRS="-L$LIB_DIR -L/usr/lib64 -L/usr/local/lib"
|
LIB_DIRS="-L$LIB_DIR -L./pqdif/lib -L/usr/lib64 -L/usr/local/lib"
|
||||||
|
|
||||||
LIBS="./cloudfront/lib/libcurl.so \
|
LIBS="./cloudfront/lib/libcurl.so \
|
||||||
./cloudfront/lib/libssl.so \
|
./cloudfront/lib/libssl.so \
|
||||||
./cloudfront/lib/libcrypto.so \
|
./cloudfront/lib/libcrypto.so \
|
||||||
./cloudfront/lib/liblog4cplus.so \
|
./cloudfront/lib/liblog4cplus.so \
|
||||||
./pqdif/lib/libpqdiflib.a \
|
|
||||||
./pqdif/lib/libz.a \
|
|
||||||
-lpthread -ldl -lrt \
|
-lpthread -ldl -lrt \
|
||||||
-lstdc++fs \
|
-lstdc++fs \
|
||||||
-lz \
|
-lz \
|
||||||
./libuv.a \
|
./libuv.a \
|
||||||
|
-Wl,--start-group \
|
||||||
|
./pqdif/lib/libpqdiflib.a \
|
||||||
|
./pqdif/lib/libz.a \
|
||||||
|
-Wl,--end-group \
|
||||||
-pthread"
|
-pthread"
|
||||||
|
|
||||||
# 如果有静态 rocketmq 库就加上
|
# 如果有静态 rocketmq 库就加上
|
||||||
@@ -74,4 +77,4 @@ if [ $? -eq 0 ]; then
|
|||||||
ldd "$OUT_DIR/$TARGET" || echo "是静态编译程序 ✔"
|
ldd "$OUT_DIR/$TARGET" || echo "是静态编译程序 ✔"
|
||||||
else
|
else
|
||||||
echo "❌ 编译失败"
|
echo "❌ 编译失败"
|
||||||
fi
|
fi
|
||||||
|
|||||||
@@ -1310,12 +1310,14 @@ bool ClientManager::set_cloud_status(const std::string& identifier, int status)
|
|||||||
connect_status_update(identifier, status);
|
connect_status_update(identifier, status);
|
||||||
std::cout << "[Device " << identifier
|
std::cout << "[Device " << identifier
|
||||||
<< "] ****Cloud status: " << ctx->cloudstatus << " updated to: " << status << std::endl;
|
<< "] ****Cloud status: " << ctx->cloudstatus << " updated to: " << status << std::endl;
|
||||||
|
DIY_INFOLOG_CODE(identifier,1,LOG_CODE_COMM,"设备登录状态更新为在线");
|
||||||
}
|
}
|
||||||
else if (ctx->cloudstatus == 1 && status == 0) {
|
else if (ctx->cloudstatus == 1 && status == 0) {
|
||||||
//设备从在线转换至离线,通知前台状态发生翻转
|
//设备从在线转换至离线,通知前台状态发生翻转
|
||||||
connect_status_update(identifier, status);
|
connect_status_update(identifier, status);
|
||||||
std::cout << "[Device " << identifier
|
std::cout << "[Device " << identifier
|
||||||
<< "] ****Cloud status: " << ctx->cloudstatus << " updated to: " << status << std::endl;
|
<< "] ****Cloud status: " << ctx->cloudstatus << " updated to: " << status << std::endl;
|
||||||
|
DIY_INFOLOG_CODE(identifier,1,LOG_CODE_COMM,"设备登录状态更新为离线");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 修改云前置登录状态
|
// 修改云前置登录状态
|
||||||
|
|||||||
@@ -10,6 +10,7 @@
|
|||||||
#include <queue>
|
#include <queue>
|
||||||
#include "dealMsg.h"
|
#include "dealMsg.h"
|
||||||
#include "PQSMsg.h"
|
#include "PQSMsg.h"
|
||||||
|
|
||||||
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD>ṹ
|
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD>ṹ
|
||||||
struct PointInfo {
|
struct PointInfo {
|
||||||
std::string point_id; // <20><><EFBFBD><EFBFBD>ID
|
std::string point_id; // <20><><EFBFBD><EFBFBD>ID
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -390,7 +390,7 @@ void Fileupload_test()
|
|||||||
// 下载文件:从远端路径下载到本地,并返回本地文件路径
|
// 下载文件:从远端路径下载到本地,并返回本地文件路径
|
||||||
// 入参:dev(设备)、remote_path(远端完整路径)
|
// 入参:dev(设备)、remote_path(远端完整路径)
|
||||||
// 返回:本地保存路径(失败返回空字符串)
|
// 返回:本地保存路径(失败返回空字符串)
|
||||||
std::string getfilefromweb(const std::string& devid, const std::string& remote_path)
|
std::string getfilefromweb(const std::string& devid, const std::string& remote_path,int type)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
terminal_dev* dev = nullptr;
|
terminal_dev* dev = nullptr;
|
||||||
@@ -428,7 +428,17 @@ std::string getfilefromweb(const std::string& devid, const std::string& remote_p
|
|||||||
|
|
||||||
//【3】构造本地保存路径
|
//【3】构造本地保存路径
|
||||||
std::string mac = sanitize(normalize_mac(dev->addr_str));
|
std::string mac = sanitize(normalize_mac(dev->addr_str));
|
||||||
std::string save_dir = std::string(FRONT_PATH) + "/bin/upload/" + mac + "/";
|
|
||||||
|
std::string save_dir;
|
||||||
|
|
||||||
|
if(type == 1) {
|
||||||
|
// 升级文件放在专门的upgrade目录下
|
||||||
|
save_dir = std::string(FRONT_PATH) + "/bin/upgrade/" + mac + "/";
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// 普通文件放在upload目录下
|
||||||
|
save_dir = std::string(FRONT_PATH) + "/bin/upload/" + mac + "/";
|
||||||
|
}
|
||||||
|
|
||||||
if (!create_directory_recursive(save_dir)) {
|
if (!create_directory_recursive(save_dir)) {
|
||||||
std::cerr << "[getfile][ERROR] create dir failed: " << save_dir << std::endl;
|
std::cerr << "[getfile][ERROR] create dir failed: " << save_dir << std::endl;
|
||||||
@@ -824,7 +834,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
|||||||
dev.processNo = safe_str(item, "node");
|
dev.processNo = safe_str(item, "node");
|
||||||
dev.maxProcessNum = safe_str(item, "maxProcessNum");
|
dev.maxProcessNum = safe_str(item, "maxProcessNum");
|
||||||
|
|
||||||
//dev.mac = safe_str(item, "mac");//添加mac
|
dev.mac = safe_str(item, "ip");//添加mac
|
||||||
|
|
||||||
if (item.contains("monitorData") && item["monitorData"].is_array()) {
|
if (item.contains("monitorData") && item["monitorData"].is_array()) {
|
||||||
for (auto& mon : item["monitorData"]) {
|
for (auto& mon : item["monitorData"]) {
|
||||||
|
|||||||
@@ -68,6 +68,12 @@ enum class ActionResult {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// ====== ★修改:扩展 RecallFile,支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
|
// ====== ★修改:扩展 RecallFile,支持“多目录 + 文件筛选 + 串行下载”的状态机 ======
|
||||||
|
enum class RecallFileType {
|
||||||
|
NONE = 0,
|
||||||
|
STEADY_FILE, // 稳态文件
|
||||||
|
VOLTAGE_FILE // 暂态直下文件
|
||||||
|
};
|
||||||
|
|
||||||
class RecallFile
|
class RecallFile
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -75,18 +81,35 @@ public:
|
|||||||
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
|
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
|
||||||
std::string StartTime; // 数据补招起始时间(yyyy-MM-dd HH:mm:ss)
|
std::string StartTime; // 数据补招起始时间(yyyy-MM-dd HH:mm:ss)
|
||||||
std::string EndTime; // 数据补招结束时间(yyyy-MM-dd HH:mm:ss)
|
std::string EndTime; // 数据补招结束时间(yyyy-MM-dd HH:mm:ss)
|
||||||
|
|
||||||
|
// ===== 业务类型 =====
|
||||||
|
// STEADY:稳态文件补招
|
||||||
|
// VOLTAGE:暂态事件补招
|
||||||
std::string STEADY; // 补招历史统计数据标识 0-不补招;1-补招
|
std::string STEADY; // 补招历史统计数据标识 0-不补招;1-补招
|
||||||
std::string VOLTAGE; // 补招暂态事件标识 0-不补招;1-补招
|
std::string VOLTAGE; // 补招暂态事件标识 0-不补招;1-补招
|
||||||
|
|
||||||
|
// ===== 文件下载类型 =====
|
||||||
|
RecallFileType file_type = RecallFileType::NONE;
|
||||||
|
|
||||||
//暂态文件用
|
//暂态文件用
|
||||||
bool direct_mode = false; // 直下文件开关:true 表示不按时间窗,仅按目标文件名
|
std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss)
|
||||||
std::string target_filetimes; // 直下文件匹配时间点(yyyyMMdd_HHmmss),仅 direct_mode=true 时有效
|
|
||||||
|
|
||||||
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
|
// ★新增:按“目录名 -> 文件名列表”的映射;由“其他线程”在目录请求成功后回填
|
||||||
std::map<std::string, std::vector<tag_dir_info>> dir_files;
|
std::map<std::string, std::vector<tag_dir_info>> dir_files;
|
||||||
|
|
||||||
// ★新增:候选目录(可扩展)
|
std::vector<std::string> steady_dir_candidates{
|
||||||
std::vector<std::string> dir_candidates{
|
"/cf/pqdif", //580绝对真实路径
|
||||||
|
"/bd0/pqdif", //chemengyu提供包含bd0
|
||||||
|
"/bd0/pqdif/%DESC%/%DAY%",
|
||||||
|
"/bd0/pqdif/Line%SEQ%",
|
||||||
|
"/bd0/historyFile/%DESC%",
|
||||||
|
"/pqdif", // 默认版本 / 新疆 可能取到其他测点文件
|
||||||
|
"/pqdif/%DESC%/%DAY%", // 上海:pqdif_dir_cfg 或描述目录 + 日期 不会取到其他测点文件
|
||||||
|
"/pqdif/Line%SEQ%", // 云南 不会取到其他测点文件
|
||||||
|
"/historyFile/%DESC%" // 广东 不会取到其他测点文件
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::string> voltage_dir_candidates{
|
||||||
"/cf/COMTRADE",
|
"/cf/COMTRADE",
|
||||||
"/bd0/COMTRADE",
|
"/bd0/COMTRADE",
|
||||||
"/sd0/COMTRADE",
|
"/sd0/COMTRADE",
|
||||||
@@ -102,6 +125,9 @@ public:
|
|||||||
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
|
ActionResult list_result = ActionResult::PENDING; // 当前目录的列举结果
|
||||||
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
|
ActionResult download_result = ActionResult::PENDING; // 当前文件的下载结果
|
||||||
|
|
||||||
|
// 稳态文件用:一个 guid 下的多个补招时间段
|
||||||
|
std::vector<std::pair<long long, long long>> recall_ranges;
|
||||||
|
|
||||||
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
|
// ★新增:下载队列(已筛选出在时间窗内的文件,含完整路径)
|
||||||
std::list<std::string> download_queue; //一个时间可能对应多个文件
|
std::list<std::string> download_queue; //一个时间可能对应多个文件
|
||||||
std::string downloading_file; // 当前正在下载的文件(完整路径)
|
std::string downloading_file; // 当前正在下载的文件(完整路径)
|
||||||
@@ -109,8 +135,22 @@ public:
|
|||||||
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
|
std::unordered_set<std::string> required_files; // 本次应当下载成功的文件全集
|
||||||
std::unordered_set<std::string> file_success; // 已下载成功的文件集合
|
std::unordered_set<std::string> file_success; // 已下载成功的文件集合
|
||||||
|
|
||||||
|
// 是否稳态文件任务
|
||||||
|
bool is_steady_file() const {
|
||||||
|
return file_type == RecallFileType::STEADY_FILE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 是否暂态直下文件任务
|
||||||
|
bool is_voltage_file() const {
|
||||||
|
return file_type == RecallFileType::VOLTAGE_FILE;
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::vector<std::string>& active_dirs() const {
|
||||||
|
return is_steady_file() ? steady_dir_candidates : voltage_dir_candidates;
|
||||||
|
}
|
||||||
|
|
||||||
// ★新增:一个便捷复位
|
// ★新增:一个便捷复位
|
||||||
void reset_runtime(bool keep_direct = false)
|
void reset_runtime(bool keep_target_filetimes = false)
|
||||||
{
|
{
|
||||||
phase = RecallPhase::IDLE;
|
phase = RecallPhase::IDLE;
|
||||||
cur_dir_index = 0;
|
cur_dir_index = 0;
|
||||||
@@ -124,9 +164,10 @@ public:
|
|||||||
required_files.clear();
|
required_files.clear();
|
||||||
file_success.clear();
|
file_success.clear();
|
||||||
|
|
||||||
|
// 注意:file_type 不属于运行态,不能清除,因为它决定了本次补招的业务类型(稳态/暂态),而这个业务类型在整个补招过程中是固定的,不应当被运行态重置影响
|
||||||
|
|
||||||
// ★新增:按需保留直下文件开关和目标名
|
// ★新增:按需保留直下文件开关和目标名
|
||||||
if (!keep_direct) {
|
if (!keep_target_filetimes) {
|
||||||
direct_mode = false;
|
|
||||||
target_filetimes.clear(); // ▲列表清空
|
target_filetimes.clear(); // ▲列表清空
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -687,7 +728,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std::
|
|||||||
|
|
||||||
//上送文件列表接口
|
//上送文件列表接口
|
||||||
bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info> &FileList);
|
bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info> &FileList);
|
||||||
std::string getfilefromweb(const std::string& devid, const std::string& remote_path);
|
std::string getfilefromweb(const std::string& devid, const std::string& remote_path,int type);
|
||||||
//提取mac
|
//提取mac
|
||||||
std::string normalize_mac(const std::string& mac);
|
std::string normalize_mac(const std::string& mac);
|
||||||
|
|
||||||
|
|||||||
@@ -50,6 +50,14 @@ extern std::string subdir;
|
|||||||
|
|
||||||
//日志主题
|
//日志主题
|
||||||
extern std::string G_LOG_TOPIC;
|
extern std::string G_LOG_TOPIC;
|
||||||
|
|
||||||
|
// 日志限流配置
|
||||||
|
extern int G_LOG_RATE_RESET_SEC;
|
||||||
|
extern int G_LOG_RATE_LIMIT_SEC;
|
||||||
|
extern int G_LOG_RATE_KEEP_ALL_MS;
|
||||||
|
extern int G_LOG_RATE_KEEP_BURST_MS;
|
||||||
|
extern int G_LOG_RATE_KEEP_BURST_COUNT;
|
||||||
|
extern int G_LOG_RATE_KEEP_HIGHFREQ_COUNT;
|
||||||
////////////////////////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
const int LOGTYPE_DEFAULT = LOG_CODE_OTHER;
|
const int LOGTYPE_DEFAULT = LOG_CODE_OTHER;
|
||||||
|
|
||||||
@@ -223,10 +231,26 @@ void refresh_log_level_cache_locked()
|
|||||||
|
|
||||||
class SendAppender : public Appender {
|
class SendAppender : public Appender {
|
||||||
private:
|
private:
|
||||||
struct RateState {
|
/*struct RateState {
|
||||||
uint64_t hit_count = 0; // 同一条日志累计命中次数
|
uint64_t hit_count = 0; // 同一条日志累计命中次数
|
||||||
std::chrono::steady_clock::time_point last_emit =
|
std::chrono::steady_clock::time_point last_emit =
|
||||||
std::chrono::steady_clock::time_point::min();
|
std::chrono::steady_clock::time_point::min();
|
||||||
|
};*/
|
||||||
|
struct RateState {
|
||||||
|
uint64_t pass_count; // 当前周期内已放行条数
|
||||||
|
uint64_t suppressed_count; // 当前被抑制条数
|
||||||
|
std::chrono::steady_clock::time_point last_emit;
|
||||||
|
std::chrono::steady_clock::time_point last_seen;
|
||||||
|
std::chrono::steady_clock::time_point last_reset;
|
||||||
|
bool has_emit;
|
||||||
|
|
||||||
|
RateState()
|
||||||
|
: pass_count(0),
|
||||||
|
suppressed_count(0),
|
||||||
|
last_emit(),
|
||||||
|
last_seen(),
|
||||||
|
last_reset(),
|
||||||
|
has_emit(false) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
static std::unordered_map<std::string, RateState> s_rate_map; //频率map
|
static std::unordered_map<std::string, RateState> s_rate_map; //频率map
|
||||||
@@ -240,7 +264,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 前 3 次:1 秒一次;第 3 次起:300 秒一次,一小时恢复
|
// 前 3 次:1 秒一次;第 3 次起:300 秒一次,一小时恢复
|
||||||
static bool should_emit(const std::string& key) {
|
/*static bool should_emit(const std::string& key) {
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
const auto now = steady_clock::now();
|
const auto now = steady_clock::now();
|
||||||
|
|
||||||
@@ -271,6 +295,96 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}*/
|
||||||
|
static bool should_emit(const std::string& key, uint64_t& suppressed_before_emit) {
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
const auto now = steady_clock::now();
|
||||||
|
suppressed_before_emit = 0;
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lk(s_rate_mutex);
|
||||||
|
RateState& st = s_rate_map[key];
|
||||||
|
|
||||||
|
const int RESET_SEC = G_LOG_RATE_RESET_SEC ; // 1小时重置
|
||||||
|
const int LIMIT_SEC = G_LOG_RATE_LIMIT_SEC ; // 进入限流后:多久发1条 主要控制中频和高频那些,低频的都直接放行了
|
||||||
|
|
||||||
|
// 初始化 / 强制每小时重置
|
||||||
|
if (st.last_reset.time_since_epoch().count() == 0) {
|
||||||
|
st.last_reset = now;
|
||||||
|
} else {
|
||||||
|
auto since_reset = duration_cast<seconds>(now - st.last_reset).count();
|
||||||
|
if (since_reset >= RESET_SEC) { //重置周期
|
||||||
|
st = RateState();
|
||||||
|
st.last_reset = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 计算当前频率档位(按“本次与上次看到该key的间隔”判断)
|
||||||
|
// >=60秒/条:全部保留
|
||||||
|
// [1秒, 60秒):保留前60条,然后1分钟1条
|
||||||
|
// <1秒:保留前10条,然后1分钟1条
|
||||||
|
int allow_burst = 0;
|
||||||
|
|
||||||
|
if (st.last_seen.time_since_epoch().count() == 0) {
|
||||||
|
// 第一次看到,先按“全部保留”处理
|
||||||
|
allow_burst = -1;
|
||||||
|
} else {
|
||||||
|
auto gap_ms = duration_cast<milliseconds>(now - st.last_seen).count();
|
||||||
|
|
||||||
|
if (gap_ms >= G_LOG_RATE_KEEP_ALL_MS) { //什么时候不需要限流 //低频 //如果这里设置的很低,就不会限流
|
||||||
|
allow_burst = -1; // 全部保留
|
||||||
|
} else if (gap_ms >= G_LOG_RATE_KEEP_BURST_MS) {
|
||||||
|
allow_burst = G_LOG_RATE_KEEP_BURST_COUNT; // 前60条 //中频 //如果这里设置的比低频低,也不会生效
|
||||||
|
} else {
|
||||||
|
allow_burst = G_LOG_RATE_KEEP_HIGHFREQ_COUNT; // 前10条 //高频
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
st.last_seen = now;
|
||||||
|
|
||||||
|
// 档位1:全部保留
|
||||||
|
if (allow_burst == -1) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 档位2/3:先放前N条
|
||||||
|
if (st.pass_count < (uint64_t)allow_burst) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 超过前N条后:进入 1分钟1条
|
||||||
|
if (!st.has_emit) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto elapsed = duration_cast<seconds>(now - st.last_emit).count();
|
||||||
|
if (elapsed >= LIMIT_SEC) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 本条被抑制
|
||||||
|
st.suppressed_count++;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -396,8 +510,17 @@ protected:
|
|||||||
|
|
||||||
// ★新增:限频判断(同一条日志前 5 次 1 秒一次;之后 300 秒一次)
|
// ★新增:限频判断(同一条日志前 5 次 1 秒一次;之后 300 秒一次)
|
||||||
const std::string key = make_key(logger_name, level, code, msg);
|
const std::string key = make_key(logger_name, level, code, msg);
|
||||||
if (!should_emit(key)) {
|
uint64_t suppressed_before_emit = 0;
|
||||||
return;
|
if (!should_emit(key, suppressed_before_emit)) return;
|
||||||
|
|
||||||
|
// 如果本次输出前压掉过日志,则在 log 文本后追加统计
|
||||||
|
std::string final_msg = msg;
|
||||||
|
if (suppressed_before_emit > 0) {
|
||||||
|
std::ostringstream suppressed_oss;
|
||||||
|
suppressed_oss << msg << " 【已过滤重复同类日志 "
|
||||||
|
<< suppressed_before_emit
|
||||||
|
<< " 条】";
|
||||||
|
final_msg = suppressed_oss.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
@@ -409,7 +532,7 @@ protected:
|
|||||||
<< "\",\"grade\":\"" << get_level_str(level)
|
<< "\",\"grade\":\"" << get_level_str(level)
|
||||||
// ★建议:code 用数字(不是字符串)
|
// ★建议:code 用数字(不是字符串)
|
||||||
<< "\",\"code\":" << code
|
<< "\",\"code\":" << code
|
||||||
<< ",\"log\":\"" << escape_json(msg) << "\"}";
|
<< ",\"log\":\"" << escape_json(final_msg) << "\"}";
|
||||||
|
|
||||||
queue_data_t connect_info;
|
queue_data_t connect_info;
|
||||||
connect_info.strTopic = G_LOG_TOPIC;
|
connect_info.strTopic = G_LOG_TOPIC;
|
||||||
|
|||||||
@@ -98,6 +98,8 @@ extern int TEST_PORT; //测试端口号
|
|||||||
|
|
||||||
extern std::string FRONT_INST;
|
extern std::string FRONT_INST;
|
||||||
|
|
||||||
|
extern bool PQD_FLAG;
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
|
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
|
||||||
|
|
||||||
template<typename T, typename... Args>
|
template<typename T, typename... Args>
|
||||||
@@ -130,6 +132,11 @@ bool parse_param(int argc, char* argv[]) {
|
|||||||
try {
|
try {
|
||||||
g_front_seg_index = std::stoi(val.substr(0, pos));
|
g_front_seg_index = std::stoi(val.substr(0, pos));
|
||||||
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
||||||
|
|
||||||
|
if (g_front_seg_index == 0) {
|
||||||
|
PQD_FLAG = true;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
std::cerr << "Invalid -s format." << std::endl;
|
std::cerr << "Invalid -s format." << std::endl;
|
||||||
}
|
}
|
||||||
@@ -144,6 +151,11 @@ bool parse_param(int argc, char* argv[]) {
|
|||||||
try {
|
try {
|
||||||
g_front_seg_index = std::stoi(val.substr(0, pos));
|
g_front_seg_index = std::stoi(val.substr(0, pos));
|
||||||
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
g_front_seg_num = std::stoi(val.substr(pos + 1));
|
||||||
|
|
||||||
|
if (g_front_seg_index == 0) {
|
||||||
|
PQD_FLAG = true;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
std::cerr << "Invalid -s format." << std::endl;
|
std::cerr << "Invalid -s format." << std::endl;
|
||||||
}
|
}
|
||||||
@@ -223,13 +235,13 @@ std::string get_parent_directory() {
|
|||||||
//解析模板文件
|
//解析模板文件
|
||||||
//Set_xml_nodeinfo();
|
//Set_xml_nodeinfo();
|
||||||
|
|
||||||
StartFrontThread(); //开启主线程
|
|
||||||
|
|
||||||
StartMQConsumerThread(); //开启消费者线程
|
|
||||||
|
|
||||||
StartMQProducerThread(); //开启生产者线程
|
StartMQProducerThread(); //开启生产者线程
|
||||||
|
|
||||||
StartTimerThread(); //开启定时线程
|
if(!PQD_FLAG){
|
||||||
|
StartFrontThread(); //开启主线程
|
||||||
|
StartMQConsumerThread(); //开启消费者线程
|
||||||
|
StartTimerThread(); //开启定时线程
|
||||||
|
}
|
||||||
|
|
||||||
//启动worker 根据启动标志启动
|
//启动worker 根据启动标志启动
|
||||||
if(G_TEST_FLAG){
|
if(G_TEST_FLAG){
|
||||||
|
|||||||
@@ -77,6 +77,9 @@ extern std::atomic<int> INITFLAG;
|
|||||||
//测试用的终端数组
|
//测试用的终端数组
|
||||||
extern std::vector<std::string> TESTARRAY;
|
extern std::vector<std::string> TESTARRAY;
|
||||||
|
|
||||||
|
extern std::map<std::string, std::string> g_upgrade_file_map;
|
||||||
|
extern std::mutex g_upgrade_file_mutex;
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////////////////////////////外部文件函数声明
|
////////////////////////////////////////////////////////////////////////////////////////////////////////外部文件函数声明
|
||||||
|
|
||||||
extern void execute_bash(std::string fun,int process_num,std::string type);
|
extern void execute_bash(std::string fun,int process_num,std::string type);
|
||||||
@@ -767,9 +770,10 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
|||||||
}
|
}
|
||||||
json_data.processNo = std::to_string(procNo);
|
json_data.processNo = std::to_string(procNo);
|
||||||
|
|
||||||
//int procNum = item.value("maxProcessNum", -1);
|
int procNum = item.value("maxProcessNum", -1);
|
||||||
//json_data.maxProcessNum = std::to_string(procNum);
|
json_data.maxProcessNum = std::to_string(procNum);
|
||||||
|
|
||||||
|
json_data.addr_str = item.value("ip", "");
|
||||||
json_data.mac = item.value("ip", "");
|
json_data.mac = item.value("ip", "");
|
||||||
//json_data.port = item.value("port", "");
|
//json_data.port = item.value("port", "");
|
||||||
//json_data.timestamp = item.value("updateTime", "");
|
//json_data.timestamp = item.value("updateTime", "");
|
||||||
@@ -2595,7 +2599,7 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
case 1115: { // 升级
|
case 1120: { // 辅助升级
|
||||||
parsed.ok = true;
|
parsed.ok = true;
|
||||||
|
|
||||||
std::cout << "[parsemsg] upgrade device, devid=" << devid
|
std::cout << "[parsemsg] upgrade device, devid=" << devid
|
||||||
@@ -2605,14 +2609,14 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann:
|
|||||||
int ret = recordguid(devid, guid, static_cast<int>(DeviceState::SET_PREUPGRADE), 2);
|
int ret = recordguid(devid, guid, static_cast<int>(DeviceState::SET_PREUPGRADE), 2);
|
||||||
if (-1 == ret) {
|
if (-1 == ret) {
|
||||||
send_reply_to_queue(guid, static_cast<int>(ResponseCode::NOT_FOUND),
|
send_reply_to_queue(guid, static_cast<int>(ResponseCode::NOT_FOUND),
|
||||||
"未找到该装置,升级指令执行失败");
|
"未找到该装置,辅助升级指令执行失败");
|
||||||
DIY_ERRORLOG_CODE(devid, 1, LOG_CODE_CLOUD, "未找到该装置,升级指令执行失败");
|
DIY_ERRORLOG_CODE(devid, 1, LOG_CODE_CLOUD, "未找到该装置,辅助升级指令执行失败");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else if (ret > 0) {
|
else if (ret > 0) {
|
||||||
send_reply_to_queue(guid, static_cast<int>(ResponseCode::BUSY),
|
send_reply_to_queue(guid, static_cast<int>(ResponseCode::BUSY),
|
||||||
"该装置正忙,升级指令执行失败");
|
"该装置正忙,辅助升级指令执行失败");
|
||||||
DIY_WARNLOG_CODE(devid, 1, LOG_CODE_CLOUD, "该装置正忙,升级指令执行失败");
|
DIY_WARNLOG_CODE(devid, 1, LOG_CODE_CLOUD, "该装置正忙,辅助升级指令执行失败");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@@ -2621,6 +2625,63 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//辅助升级就是文件替换
|
||||||
|
//ClientManager::instance().set_preupgrade_action_to_device(devid, "");//尝试装置升级指令!第一步校验
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 1115: {
|
||||||
|
if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false;
|
||||||
|
|
||||||
|
parsed.ok = true;
|
||||||
|
|
||||||
|
std::cout << "[parsemsg] update, devid=" << devid
|
||||||
|
<< ", guid=" << guid << std::endl;
|
||||||
|
|
||||||
|
//【1】recordguid
|
||||||
|
{
|
||||||
|
int ret = recordguid(devid, guid, static_cast<int>(DeviceState::SET_PREUPGRADE), 2);
|
||||||
|
if (-1 == ret) {
|
||||||
|
send_reply_to_queue(guid, static_cast<int>(ResponseCode::NOT_FOUND),
|
||||||
|
"未找到该装置,装置升级指令执行失败");
|
||||||
|
DIY_ERRORLOG_CODE(devid, 1, LOG_CODE_CLOUD, "未找到该装置,装置升级指令执行失败");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else if (ret > 0) {
|
||||||
|
send_reply_to_queue(guid, static_cast<int>(ResponseCode::BUSY),
|
||||||
|
"该装置正忙,装置升级指令执行失败");
|
||||||
|
DIY_WARNLOG_CODE(devid, 1, LOG_CODE_CLOUD, "该装置正忙,装置升级指令执行失败");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
std::cout << "记录装置状态成功,准备执行装置升级" << std::endl;
|
||||||
|
DIY_INFOLOG_CODE(devid, 1, LOG_CODE_CLOUD, "记录装置状态成功,准备执行装置升级");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//【2】取参数(加 sanitize)
|
||||||
|
std::string remote_path = sanitize(msgObj["Name"].get<std::string>()); // 云端路径
|
||||||
|
|
||||||
|
std::cout << "[parsemsg][1115] remote=" << remote_path
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
//【3】先下载到本地
|
||||||
|
std::string local_path = getfilefromweb(devid, remote_path,1);//升级文件下载到本地的路径和普通文件不一样,放在专门的upgrade目录下
|
||||||
|
|
||||||
|
if (local_path.empty()) {
|
||||||
|
send_reply_to_queue(guid, static_cast<int>(ResponseCode::BAD_REQUEST),
|
||||||
|
"文件上送失败,下载源文件失败: " + remote_path);
|
||||||
|
DIY_ERRORLOG_CODE(devid, 1, LOG_CODE_CLOUD, "装置升级失败,下载源文件失败");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(g_upgrade_file_mutex);
|
||||||
|
g_upgrade_file_map[devid] = local_path;
|
||||||
|
}
|
||||||
|
|
||||||
|
DIY_INFOLOG_CODE(devid, 1, LOG_CODE_CLOUD, "升级文件下载成功,准备执行装置升级");
|
||||||
|
|
||||||
ClientManager::instance().set_preupgrade_action_to_device(devid, "");//尝试装置升级指令!第一步校验
|
ClientManager::instance().set_preupgrade_action_to_device(devid, "");//尝试装置升级指令!第一步校验
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -2665,7 +2726,7 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann:
|
|||||||
<< ", dest=" << dest_file_path << std::endl;
|
<< ", dest=" << dest_file_path << std::endl;
|
||||||
|
|
||||||
//【3】先下载到本地
|
//【3】先下载到本地
|
||||||
std::string local_path = getfilefromweb(devid, remote_path);
|
std::string local_path = getfilefromweb(devid, remote_path,0);//普通文件下载到本地的路径
|
||||||
|
|
||||||
if (local_path.empty()) {
|
if (local_path.empty()) {
|
||||||
send_reply_to_queue(guid, static_cast<int>(ResponseCode::BAD_REQUEST),
|
send_reply_to_queue(guid, static_cast<int>(ResponseCode::BAD_REQUEST),
|
||||||
@@ -2849,6 +2910,10 @@ static int get_cloud_type_by_state(int type)
|
|||||||
case DeviceState::SET_RIGHTTIME_2: return 1113; // 主动对时
|
case DeviceState::SET_RIGHTTIME_2: return 1113; // 主动对时
|
||||||
case DeviceState::SET_CTRL: return 1114; // 控制/重启
|
case DeviceState::SET_CTRL: return 1114; // 控制/重启
|
||||||
case DeviceState::SET_PREUPGRADE: return 1115; // 升级预校验
|
case DeviceState::SET_PREUPGRADE: return 1115; // 升级预校验
|
||||||
|
case DeviceState::SEND_FILE: return 1116; // 文件上送
|
||||||
|
case DeviceState::DEL_FILE: return 1117; // 文件删除
|
||||||
|
case DeviceState::SEND_MENU: return 1118; // 目录创建
|
||||||
|
case DeviceState::DEL_MENU: return 1119; // 目录删除
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return type; // 其他未映射的,保持原值,避免影响现有逻辑
|
return type; // 其他未映射的,保持原值,避免影响现有逻辑
|
||||||
|
|||||||
@@ -758,12 +758,20 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
|||||||
<< ", VOLTAGE=" << rf.VOLTAGE
|
<< ", VOLTAGE=" << rf.VOLTAGE
|
||||||
<< "\n";
|
<< "\n";
|
||||||
|
|
||||||
// ★新增:直下模式与目标时间列表
|
// ★新增:文件补招类型与目标信息
|
||||||
os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false")
|
os << "\r\x1B[K |-- file_type="
|
||||||
<< ", target_filetimes(" << rf.target_filetimes << ")\n";
|
<< (rf.is_steady_file() ? "STEADY_FILE" :
|
||||||
{
|
rf.is_voltage_file() ? "VOLTAGE_FILE" : "NONE");
|
||||||
os << "\r\x1B[K |.. " << rf.target_filetimes << "\n";
|
|
||||||
|
if (rf.is_voltage_file()) {
|
||||||
|
os << ", target_filetimes=" << rf.target_filetimes;
|
||||||
}
|
}
|
||||||
|
else if (rf.is_steady_file()) {
|
||||||
|
os << ", time_range=" << rf.StartTime
|
||||||
|
<< " ~ " << rf.EndTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
os << "\n";
|
||||||
|
|
||||||
// ★新增:状态机运行态
|
// ★新增:状态机运行态
|
||||||
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
|
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
|
||||||
@@ -773,15 +781,15 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
|||||||
<< ", download_result=" << resultStr(rf.download_result) << "\n";
|
<< ", download_result=" << resultStr(rf.download_result) << "\n";
|
||||||
|
|
||||||
// ★新增:候选目录
|
// ★新增:候选目录
|
||||||
os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n";
|
os << "\r\x1B[K |-- active_dirs(" << rf.active_dirs().size() << ")\n";
|
||||||
{
|
{
|
||||||
size_t c = 0;
|
size_t c = 0;
|
||||||
for (const auto& d : rf.dir_candidates) {
|
for (const auto& d : rf.active_dirs()) {
|
||||||
if (c++ >= MAX_ITEMS) break;
|
if (c++ >= MAX_ITEMS) break;
|
||||||
os << "\r\x1B[K |-- " << d << "\n";
|
os << "\r\x1B[K |-- " << d << "\n";
|
||||||
}
|
}
|
||||||
if (rf.dir_candidates.size() > MAX_ITEMS) {
|
if (rf.active_dirs().size() > MAX_ITEMS) {
|
||||||
os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n";
|
os << "\r\x1B[K |.. (+" << (rf.active_dirs().size() - MAX_ITEMS) << " more)\n";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,6 @@
|
|||||||
#include "cloudfront/code/rocketmq.h" //lnk20250708
|
#include "cloudfront/code/rocketmq.h" //lnk20250708
|
||||||
#include "cloudfront/code/log4.h" //lnk20250924
|
#include "cloudfront/code/log4.h" //lnk20250924
|
||||||
#include "client2.h"
|
#include "client2.h"
|
||||||
#include "cloudfront/code/log4.h"
|
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
@@ -245,6 +244,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
//装置主动上送报文 暂态事件报文/暂态波形文件报文
|
//装置主动上送报文 暂态事件报文/暂态波形文件报文
|
||||||
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_Event)) {
|
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_Event)) {
|
||||||
//处理主动上送的暂态事件报文
|
//处理主动上送的暂态事件报文
|
||||||
|
std::cout << "GET: MsgResponseType::Response_Event";
|
||||||
|
DIY_INFOLOG_CODE(id, 1, static_cast<int>(LogCode::LOG_CODE_TRANSIENT), "收到装置主动上送的暂态事件信息报文");
|
||||||
NewTaglogbuffer event = NewTaglogbuffer::createFromData(parser.RecvData.data(), parser.RecvData.size());
|
NewTaglogbuffer event = NewTaglogbuffer::createFromData(parser.RecvData.data(), parser.RecvData.size());
|
||||||
|
|
||||||
//获取测点id
|
//获取测点id
|
||||||
@@ -349,11 +350,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
||||||
|
|
||||||
//lnk20250805 事件上送先记录,录波文件上传结束后再更新文件
|
//lnk20250805 事件上送先记录,录波文件上传结束后再更新文件
|
||||||
append_qvvr_event(id,event.head.name,
|
if(record.nType != 0){
|
||||||
|
append_qvvr_event(id,event.head.name,
|
||||||
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
||||||
transfer_json_qvvr_data(id,event.head.name,
|
transfer_json_qvvr_data(id,event.head.name,
|
||||||
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
|
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
|
||||||
"");
|
"");
|
||||||
|
}
|
||||||
|
|
||||||
//事件主动上送处理完成,不需要通知状态机
|
//事件主动上送处理完成,不需要通知状态机
|
||||||
}
|
}
|
||||||
@@ -366,6 +369,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
}
|
}
|
||||||
else if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_ActiveSOEInfo)) {
|
else if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_ActiveSOEInfo)) {
|
||||||
//处理主动上送的波形文件信息报文
|
//处理主动上送的波形文件信息报文
|
||||||
|
std::cout << "GET: MsgResponseType::Response_ActiveSOEInfo";
|
||||||
|
DIY_INFOLOG_CODE(id, 1, static_cast<int>(LogCode::LOG_CODE_TRANSIENT), "收到装置主动上送的暂态波形文件信息报文");
|
||||||
unsigned char file_type = udata[12];//录波文件类型数 cfg dat hdr 1-3
|
unsigned char file_type = udata[12];//录波文件类型数 cfg dat hdr 1-3
|
||||||
unsigned char line_id = udata[13];//录波测点 1-6
|
unsigned char line_id = udata[13];//录波测点 1-6
|
||||||
const uint8_t* data_ptr = parser.RecvData.data() + 2;//数据体去除前两位
|
const uint8_t* data_ptr = parser.RecvData.data() + 2;//数据体去除前两位
|
||||||
@@ -847,8 +852,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
<< ", Error: " << strerror(errno) << std::endl;
|
<< ", Error: " << strerror(errno) << std::endl;
|
||||||
|
|
||||||
// 文件保存失败,通知云端
|
// 文件保存失败,通知云端
|
||||||
/*on_device_response_minimal(static_cast<int>(ResponseCode::BAD_REQUEST),
|
//on_device_response_minimal(static_cast<int>(ResponseCode::BAD_REQUEST),
|
||||||
id, 0, static_cast<int>(DeviceState::READING_EVENTFILE));*/
|
// id, 0, static_cast<int>(DeviceState::READING_EVENTFILE));
|
||||||
}
|
}
|
||||||
|
|
||||||
//当前文件下载完毕,调整为空闲处理下一项工作(如果这里后续有新文件等待下载,一般已经存入等待队列等候处理了,调成空闲状态后直接就会开始新文件的下载工作)
|
//当前文件下载完毕,调整为空闲处理下一项工作(如果这里后续有新文件等待下载,一般已经存入等待队列等候处理了,调成空闲状态后直接就会开始新文件的下载工作)
|
||||||
@@ -988,7 +993,7 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
// 当前帧被拒收,文件上送失败
|
// 当前帧被拒收,文件上送失败
|
||||||
std::cout << "*** send file 0x41 fail ***! " << mac << std::endl;
|
std::cout << "*** send file 0x41 fail ***! " << mac << std::endl;
|
||||||
|
|
||||||
on_device_response_minimal(static_cast<int>(ResponseCode::REJECTED_BUSY), id, 0, static_cast<int>(DeviceState::SEND_FILE));
|
on_device_response_minimal(static_cast<int>(ResponseCode::FORBIDDEN), id, 0, static_cast<int>(DeviceState::SEND_FILE));
|
||||||
|
|
||||||
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
||||||
}
|
}
|
||||||
@@ -1016,7 +1021,7 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
// 当前帧被拒收,文件删除失败
|
// 当前帧被拒收,文件删除失败
|
||||||
std::cout << "*** del file 0x41 fail ***! " << mac << std::endl;
|
std::cout << "*** del file 0x41 fail ***! " << mac << std::endl;
|
||||||
|
|
||||||
on_device_response_minimal(static_cast<int>(ResponseCode::REJECTED_BUSY), id, 0, static_cast<int>(DeviceState::DEL_FILE));
|
on_device_response_minimal(static_cast<int>(ResponseCode::FORBIDDEN), id, 0, static_cast<int>(DeviceState::DEL_FILE));
|
||||||
|
|
||||||
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
||||||
}
|
}
|
||||||
@@ -1044,7 +1049,7 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
// 当前帧被拒收,创建目录失败
|
// 当前帧被拒收,创建目录失败
|
||||||
std::cout << "*** send menu 0x41 fail ***! " << mac << std::endl;
|
std::cout << "*** send menu 0x41 fail ***! " << mac << std::endl;
|
||||||
|
|
||||||
on_device_response_minimal(static_cast<int>(ResponseCode::REJECTED_BUSY), id, 0, static_cast<int>(DeviceState::SEND_MENU));
|
on_device_response_minimal(static_cast<int>(ResponseCode::FORBIDDEN), id, 0, static_cast<int>(DeviceState::SEND_MENU));
|
||||||
|
|
||||||
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
||||||
}
|
}
|
||||||
@@ -1072,7 +1077,7 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
// 当前帧被拒收,删除目录失败
|
// 当前帧被拒收,删除目录失败
|
||||||
std::cout << "*** del menu 0x41 fail ***! " << mac << std::endl;
|
std::cout << "*** del menu 0x41 fail ***! " << mac << std::endl;
|
||||||
|
|
||||||
on_device_response_minimal(static_cast<int>(ResponseCode::REJECTED_BUSY), id, 0, static_cast<int>(DeviceState::DEL_MENU));
|
on_device_response_minimal(static_cast<int>(ResponseCode::FORBIDDEN), id, 0, static_cast<int>(DeviceState::DEL_MENU));
|
||||||
|
|
||||||
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
|
||||||
}
|
}
|
||||||
@@ -2446,12 +2451,14 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
|
||||||
|
|
||||||
//记录补招上来的暂态事件
|
//记录补招上来的暂态事件
|
||||||
append_qvvr_event(id,event.head.name,
|
if(record.nType != 0){
|
||||||
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
append_qvvr_event(id,event.head.name,
|
||||||
|
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
|
||||||
|
|
||||||
//直接发走暂态事件
|
//直接发走暂态事件
|
||||||
transfer_json_qvvr_data(id,event.head.name,
|
transfer_json_qvvr_data(id,event.head.name,
|
||||||
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
|
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
|
||||||
|
}
|
||||||
//通知状态机补招暂态事件成功
|
//通知状态机补招暂态事件成功
|
||||||
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));
|
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
#include <new>
|
||||||
|
|
||||||
// PQDIF 解析库
|
// PQDIF 解析库
|
||||||
#include "pqdif/PQDIF.h"
|
#include "pqdif/PQDIF.h"
|
||||||
@@ -28,6 +29,17 @@
|
|||||||
#include "pqdif/include/pqdif_lg.h"
|
#include "pqdif/include/pqdif_lg.h"
|
||||||
#include "pqdif_semantic_ids.h"
|
#include "pqdif_semantic_ids.h"
|
||||||
|
|
||||||
|
#include "cloudfront/code/log4.h" //lnk20260526
|
||||||
|
|
||||||
|
extern void enqueue_stat_pq(const std::string& max_base64Str,
|
||||||
|
const std::string& min_base64Str,
|
||||||
|
const std::string& avg_base64Str,
|
||||||
|
const std::string& cp95_base64Str,
|
||||||
|
time_t data_time,
|
||||||
|
const std::string& mac,
|
||||||
|
short cid);
|
||||||
|
extern std::string extract_filename1(const std::string& path);
|
||||||
|
|
||||||
namespace fs = std::experimental::filesystem;
|
namespace fs = std::experimental::filesystem;
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
@@ -37,7 +49,11 @@ namespace {
|
|||||||
constexpr int kMaxPqdifFilesPerScan = 1;
|
constexpr int kMaxPqdifFilesPerScan = 1;
|
||||||
constexpr size_t kParsedCacheLimit = 128;
|
constexpr size_t kParsedCacheLimit = 128;
|
||||||
|
|
||||||
const char* kPqdRootDir = "download";
|
// 大文件流式阈值:估算展开点数超过该值时,不再构造 expanded_stat_points,
|
||||||
|
// 而是按通道逐步聚合成时间桶并直接组装 Base64,避免单文件中间对象占用过大内存。
|
||||||
|
constexpr size_t kPqdifLargeFileStreamingPointThreshold = 800000;
|
||||||
|
|
||||||
|
const char* kPqdRootDir = "download_pqdif";
|
||||||
const char* kDoneRootDir = "download_done";
|
const char* kDoneRootDir = "download_done";
|
||||||
const char* kFailRootDir = "download_fail";
|
const char* kFailRootDir = "download_fail";
|
||||||
|
|
||||||
@@ -5072,6 +5088,250 @@ namespace {
|
|||||||
return stat_select_best_metric_sources(out);
|
return stat_select_best_metric_sources(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
size_t stat_estimate_candidate_point_count_for_streaming(const PqdifLogicalFile& lf)
|
||||||
|
{
|
||||||
|
size_t total = 0;
|
||||||
|
for (const auto& obs : lf.observations)
|
||||||
|
{
|
||||||
|
for (const auto& ch : obs.channel_instances)
|
||||||
|
{
|
||||||
|
if (!pqdif_sem::IsQuantityTypeValueLog(ch.quantity_type_id.value))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
for (const auto& si : ch.series_instances)
|
||||||
|
{
|
||||||
|
if (pqdif_sem::IsValueTypeTime(si.value_type_id.value))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
const PqdifSeriesInstance* resolved = stat_resolve_shared_series(obs, si);
|
||||||
|
if (resolved == nullptr || resolved->values.count <= 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
total += static_cast<size_t>(resolved->values.count);
|
||||||
|
if (total > kPqdifLargeFileStreamingPointThreshold)
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct StatStreamingSourceBucket
|
||||||
|
{
|
||||||
|
StatMetricId metric_id = StatMetricId::Unknown;
|
||||||
|
StatMetricSourceKey key;
|
||||||
|
StatMetricSourceStats stats;
|
||||||
|
std::map<time_t, AggregatedStatValues> values_by_time;
|
||||||
|
std::map<time_t, std::string> time_text_by_time;
|
||||||
|
};
|
||||||
|
|
||||||
|
void stat_streaming_set_value(AggregatedStatValues& agg, const ExpandedStatPoint& p)
|
||||||
|
{
|
||||||
|
if (agg.source_observation_index < 0)
|
||||||
|
{
|
||||||
|
agg.source_observation_index = p.observation_index;
|
||||||
|
agg.source_channel_instance_index = p.channel_instance_index;
|
||||||
|
agg.source_series_instance_index = p.series_instance_index;
|
||||||
|
agg.source_channel_name = p.channel_name;
|
||||||
|
agg.quality = StatMetricQuality::Normal;
|
||||||
|
agg.quality_reason = "ok";
|
||||||
|
}
|
||||||
|
|
||||||
|
// 同一来源同一时间同一种统计值如果重复,保留首次写入,避免一次性覆盖造成结果不稳定。
|
||||||
|
if (stat_has_value_kind(agg, p.stat_kind))
|
||||||
|
return;
|
||||||
|
|
||||||
|
agg.source_series_instance_index = p.series_instance_index;
|
||||||
|
switch (p.stat_kind)
|
||||||
|
{
|
||||||
|
case StatValueKind::Min:
|
||||||
|
agg.has_min = true;
|
||||||
|
agg.min_value = p.value;
|
||||||
|
break;
|
||||||
|
case StatValueKind::Max:
|
||||||
|
agg.has_max = true;
|
||||||
|
agg.max_value = p.value;
|
||||||
|
break;
|
||||||
|
case StatValueKind::Avg:
|
||||||
|
agg.has_avg = true;
|
||||||
|
agg.avg_value = p.value;
|
||||||
|
break;
|
||||||
|
case StatValueKind::P95:
|
||||||
|
agg.has_p95 = true;
|
||||||
|
agg.p95_value = p.value;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void stat_streaming_add_point(
|
||||||
|
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>>& by_metric,
|
||||||
|
const ExpandedStatPoint& p)
|
||||||
|
{
|
||||||
|
if (p.metric_id == StatMetricId::Unknown || p.stat_kind == StatValueKind::Unknown)
|
||||||
|
return;
|
||||||
|
|
||||||
|
const StatMetricSourceKey key = stat_make_source_key(p);
|
||||||
|
StatStreamingSourceBucket& src = by_metric[p.metric_id][key];
|
||||||
|
if (src.metric_id == StatMetricId::Unknown)
|
||||||
|
{
|
||||||
|
src.metric_id = p.metric_id;
|
||||||
|
src.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
src.stats.add(p);
|
||||||
|
AggregatedStatValues& agg = src.values_by_time[p.timestamp];
|
||||||
|
stat_streaming_set_value(agg, p);
|
||||||
|
if (src.time_text_by_time.find(p.timestamp) == src.time_text_by_time.end())
|
||||||
|
src.time_text_by_time[p.timestamp] = p.timestamp_text;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<TimeAggregatedStatBucket> stat_build_aggregated_buckets_streaming(
|
||||||
|
const PqdifLogicalFile& lf,
|
||||||
|
ParsedConnectionKind connection_kind,
|
||||||
|
int& selected_observation_index,
|
||||||
|
std::string& selected_observation_name,
|
||||||
|
size_t& out_raw_point_count,
|
||||||
|
size_t& out_selected_metric_count,
|
||||||
|
size_t& out_selected_dynamic_metric_count)
|
||||||
|
{
|
||||||
|
selected_observation_index = -1;
|
||||||
|
selected_observation_name.clear();
|
||||||
|
out_raw_point_count = 0;
|
||||||
|
out_selected_metric_count = 0;
|
||||||
|
out_selected_dynamic_metric_count = 0;
|
||||||
|
|
||||||
|
const PqdifObservationRecord* primary = stat_select_primary_statistical_observation(lf);
|
||||||
|
if (primary != nullptr)
|
||||||
|
{
|
||||||
|
selected_observation_index = primary->observation_index;
|
||||||
|
selected_observation_name = primary->observation_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<StatMetricId, std::map<StatMetricSourceKey, StatStreamingSourceBucket>> by_metric;
|
||||||
|
|
||||||
|
// 大文件模式:遍历全部 observations,但每次只临时展开一个通道,随后立刻压缩到
|
||||||
|
// metric/source/timestamp/kind 聚合结构中,不保留全量 ExpandedStatPoint。
|
||||||
|
for (const auto& obs : lf.observations)
|
||||||
|
{
|
||||||
|
for (const auto& ch : obs.channel_instances)
|
||||||
|
{
|
||||||
|
std::vector<ExpandedStatPoint> points = stat_expand_channel_points(lf, connection_kind, obs, ch);
|
||||||
|
out_raw_point_count += points.size();
|
||||||
|
for (const auto& p : points)
|
||||||
|
stat_streaming_add_point(by_metric, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<time_t, TimeAggregatedStatBucket> global_buckets;
|
||||||
|
size_t duplicate_metric_count = 0;
|
||||||
|
|
||||||
|
for (auto& metric_pair : by_metric)
|
||||||
|
{
|
||||||
|
const StatMetricId metric_id = metric_pair.first;
|
||||||
|
auto& sources = metric_pair.second;
|
||||||
|
if (sources.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
bool has_selected = false;
|
||||||
|
StatMetricSourceKey best_key;
|
||||||
|
int best_score = std::numeric_limits<int>::min();
|
||||||
|
|
||||||
|
for (auto& src_pair : sources)
|
||||||
|
{
|
||||||
|
int score = stat_metric_source_score(src_pair.second.stats);
|
||||||
|
// 大文件流式模式会遍历全部 observations。为尽量保持原逻辑,普通核心指标在分数接近时
|
||||||
|
// 优先使用主统计 observation;谐波等只存在于其他 observation 时仍能正常选中。
|
||||||
|
if (selected_observation_index >= 0 && src_pair.first.observation_index == selected_observation_index)
|
||||||
|
score += 25;
|
||||||
|
|
||||||
|
if (!has_selected || score > best_score)
|
||||||
|
{
|
||||||
|
has_selected = true;
|
||||||
|
best_score = score;
|
||||||
|
best_key = src_pair.first;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!has_selected)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
++out_selected_metric_count;
|
||||||
|
if (stat_is_dynamic_metric(metric_id))
|
||||||
|
++out_selected_dynamic_metric_count;
|
||||||
|
if (sources.size() > 1)
|
||||||
|
++duplicate_metric_count;
|
||||||
|
|
||||||
|
StatStreamingSourceBucket& selected_src = sources[best_key];
|
||||||
|
for (auto& time_pair : selected_src.values_by_time)
|
||||||
|
{
|
||||||
|
TimeAggregatedStatBucket& bucket = global_buckets[time_pair.first];
|
||||||
|
if (bucket.timestamp == 0)
|
||||||
|
{
|
||||||
|
bucket.timestamp = time_pair.first;
|
||||||
|
const auto tit = selected_src.time_text_by_time.find(time_pair.first);
|
||||||
|
bucket.timestamp_text = (tit == selected_src.time_text_by_time.end()) ? format_time_text(time_pair.first) : tit->second;
|
||||||
|
}
|
||||||
|
bucket.metrics[metric_id] = time_pair.second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<TimeAggregatedStatBucket> out;
|
||||||
|
out.reserve(global_buckets.size());
|
||||||
|
for (auto& kv : global_buckets)
|
||||||
|
out.push_back(std::move(kv.second));
|
||||||
|
|
||||||
|
if (pqdif_log_enabled(PqdifLogLevel::Core))
|
||||||
|
{
|
||||||
|
std::cout << "========== PQDIF LARGE FILE STREAMING SUMMARY ==========" << std::endl;
|
||||||
|
std::cout << "raw_points=" << out_raw_point_count
|
||||||
|
<< ", candidate_metric_count=" << by_metric.size()
|
||||||
|
<< ", selected_metric_count=" << out_selected_metric_count
|
||||||
|
<< ", selected_dynamic_metrics=" << out_selected_dynamic_metric_count << "/" << stat_all_dynamic_metric_order().size()
|
||||||
|
<< ", duplicate_metric_count=" << duplicate_metric_count
|
||||||
|
<< ", buckets=" << out.size()
|
||||||
|
<< ", selected_observation_index=" << selected_observation_index
|
||||||
|
<< ", selected_observation_name=" << selected_observation_name
|
||||||
|
<< std::endl;
|
||||||
|
std::cout << "mode=streaming_no_expanded_points; detail=per-channel expand then immediate aggregate" << std::endl;
|
||||||
|
std::cout << "========================================================" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
void stat_print_aggregated_metric_line(StatMetricId metric_id, const AggregatedStatValues* agg);
|
||||||
|
void dump_grouped_bucket_streaming_preview(const ParsedPqdifFile& parsed_file)
|
||||||
|
{
|
||||||
|
std::cout << "========== GROUPED STAT STREAMING CORE SUMMARY ==========" << std::endl;
|
||||||
|
std::cout << "connection_kind=" << stat_connection_kind_name(parsed_file.connection_kind)
|
||||||
|
<< ", selected_observation_index=" << parsed_file.selected_observation_index
|
||||||
|
<< ", selected_observation_name=" << parsed_file.selected_observation_name
|
||||||
|
<< ", expanded_points=SKIPPED_BY_STREAMING"
|
||||||
|
<< ", buckets=" << parsed_file.aggregated_stat_buckets.size()
|
||||||
|
<< ", core_metric_slots=" << stat_core_metric_print_order().size()
|
||||||
|
<< ", dynamic_spectrum_slots=" << stat_all_dynamic_metric_order().size()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
const size_t bucket_limit = std::min<size_t>(parsed_file.aggregated_stat_buckets.size(), 3);
|
||||||
|
for (size_t i = 0; i < bucket_limit; ++i)
|
||||||
|
{
|
||||||
|
const auto& b = parsed_file.aggregated_stat_buckets[i];
|
||||||
|
std::cout << " [BUCKET " << i << "] time=" << b.timestamp_text
|
||||||
|
<< ", metric_count_present=" << b.metrics.size()
|
||||||
|
<< std::endl;
|
||||||
|
for (const auto metric_id : stat_core_metric_print_order())
|
||||||
|
{
|
||||||
|
const auto it = b.metrics.find(metric_id);
|
||||||
|
if (it != b.metrics.end())
|
||||||
|
stat_print_aggregated_metric_line(metric_id, &it->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::cout << "=========================================================" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
bool pqdif_probe_text_looks_like_flicker(const std::string& text)
|
bool pqdif_probe_text_looks_like_flicker(const std::string& text)
|
||||||
{
|
{
|
||||||
const std::string key = normalize_key(text);
|
const std::string key = normalize_key(text);
|
||||||
@@ -7496,9 +7756,41 @@ namespace {
|
|||||||
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
// 按你的要求,入队前完整打印保存对象内部结构:文件批次 -> 时间点 -> Max/Min/Avg/P95 子记录 -> Base64 内容。
|
// 完整 Base64 内容日志非常长,尤其大文件会明显放大内存和 I/O 压力。
|
||||||
// 日志会很长;确认完成后可以临时注释掉这一行,或改成 if (pqdif_log_enabled(PqdifLogLevel::Info)) 包裹。
|
// 现在仅在 Debug/Trace 级别打印完整对象;Core/Info 只打印摘要和前几条子记录。
|
||||||
//pqdif_dump_stat_base64_file_batch_full(file_batch);
|
if (pqdif_log_enabled(PqdifLogLevel::Debug))
|
||||||
|
{
|
||||||
|
pqdif_dump_stat_base64_file_batch_full(file_batch);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cout << " [BASE64 FILE BATCH COMPACT] file=" << file_batch.pqdif_file_path
|
||||||
|
<< ", time_points=" << file_batch.time_point_count
|
||||||
|
<< ", records=" << file_batch.total_record_count
|
||||||
|
<< ", total_float_count=" << file_batch.total_float_count
|
||||||
|
<< ", total_placeholder_count=" << file_batch.total_placeholder_count
|
||||||
|
<< ", total_base64_chars=" << file_batch.total_base64_chars
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
size_t shown = 0;
|
||||||
|
for (const auto& tp : file_batch.time_points)
|
||||||
|
{
|
||||||
|
for (const auto& rec : tp.records)
|
||||||
|
{
|
||||||
|
if (shown >= 4)
|
||||||
|
break;
|
||||||
|
std::cout << " [BASE64 SAVED SAMPLE] time=" << rec.timestamp_text
|
||||||
|
<< ", kind=" << rec.value_kind_name
|
||||||
|
<< ", floats=" << rec.float_count
|
||||||
|
<< ", placeholders=" << rec.placeholder_count
|
||||||
|
<< ", base64_len=" << rec.base64_payload.size()
|
||||||
|
<< std::endl;
|
||||||
|
++shown;
|
||||||
|
}
|
||||||
|
if (shown >= 4)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
push_pqdif_stat_base64_file_batch(std::move(file_batch));
|
push_pqdif_stat_base64_file_batch(std::move(file_batch));
|
||||||
|
|
||||||
@@ -7548,6 +7840,54 @@ namespace {
|
|||||||
// 后续指标判断需要先区分星型 / 角型两套规则。
|
// 后续指标判断需要先区分星型 / 角型两套规则。
|
||||||
parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file);
|
parsed_file.connection_kind = stat_classify_connection_kind(parsed_file.logical_file);
|
||||||
|
|
||||||
|
// 2) 根据估算展开点数决定是否启用“大文件流式模式”。
|
||||||
|
// 普通文件仍走原 expanded_stat_points -> grouped buckets 流程;
|
||||||
|
// 大文件则直接按通道聚合为 buckets,不再保存全量 expanded_stat_points。
|
||||||
|
const size_t estimated_candidate_points =
|
||||||
|
stat_estimate_candidate_point_count_for_streaming(parsed_file.logical_file);
|
||||||
|
const bool use_large_file_streaming =
|
||||||
|
estimated_candidate_points > kPqdifLargeFileStreamingPointThreshold;
|
||||||
|
|
||||||
|
if (use_large_file_streaming)
|
||||||
|
{
|
||||||
|
std::cout << "[PQDIF][MEMORY][STREAMING] large file detected, use streaming bucket/base64 build. "
|
||||||
|
<< "estimated_candidate_points=" << estimated_candidate_points
|
||||||
|
<< ", threshold=" << kPqdifLargeFileStreamingPointThreshold
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
size_t raw_point_count = 0;
|
||||||
|
size_t selected_metric_count = 0;
|
||||||
|
size_t selected_dynamic_metric_count = 0;
|
||||||
|
parsed_file.aggregated_stat_buckets = stat_build_aggregated_buckets_streaming(
|
||||||
|
parsed_file.logical_file,
|
||||||
|
parsed_file.connection_kind,
|
||||||
|
parsed_file.selected_observation_index,
|
||||||
|
parsed_file.selected_observation_name,
|
||||||
|
raw_point_count,
|
||||||
|
selected_metric_count,
|
||||||
|
selected_dynamic_metric_count);
|
||||||
|
|
||||||
|
if (pqdif_is_trace_log_enabled())
|
||||||
|
dump_semantic_probe(parsed_file);
|
||||||
|
|
||||||
|
dump_grouped_bucket_streaming_preview(parsed_file);
|
||||||
|
|
||||||
|
// 3) 大文件仍完整生成 Base64 文件批次,只是跳过 expanded_stat_points 中间缓存。
|
||||||
|
pqdif_build_and_queue_base64_records(parsed_file);
|
||||||
|
|
||||||
|
// 大文件模式不再把完整 ParsedPqdifFile 放入解析缓存,避免 logical_file + buckets 在内存中长期驻留。
|
||||||
|
// 后续业务请从 PqdifStatBase64FileBatch 队列读取最终 Base64 结果。
|
||||||
|
std::cout << "[PQDIF][MEMORY][STREAMING] skip parsed_file cache for large file, "
|
||||||
|
<< "base64 batch has been queued. file=" << file_path.string()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
std::cout << "[PQDIF] processed ok(streaming): " << file_path.string()
|
||||||
|
<< ", cache_size=" << GetParsedPqdifCacheSize()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// 2) 只筛选“统计类 observation”,并对其展开目标统计指标。
|
// 2) 只筛选“统计类 observation”,并对其展开目标统计指标。
|
||||||
// 当前阶段不处理全部 observation,避免不同 observation 混入同一时间聚合结果。
|
// 当前阶段不处理全部 observation,避免不同 observation 混入同一时间聚合结果。
|
||||||
parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation(
|
parsed_file.expanded_stat_points = stat_expand_selected_statistical_observation(
|
||||||
@@ -7655,14 +7995,50 @@ namespace {
|
|||||||
<< ", max_per_scan=" << kMaxPqdifFilesPerScan
|
<< ", max_per_scan=" << kMaxPqdifFilesPerScan
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
const bool ok = process_single_pqdif_file(item.path, item.mac);
|
bool ok = false;
|
||||||
|
bool caught_exception = false;
|
||||||
|
std::string exception_text;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ok = process_single_pqdif_file(item.path, item.mac);
|
||||||
|
}
|
||||||
|
catch (const std::bad_alloc& ex)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
caught_exception = true;
|
||||||
|
exception_text = ex.what();
|
||||||
|
std::cout << "[PQDIF][OOM] std::bad_alloc while processing file="
|
||||||
|
<< item.path.string()
|
||||||
|
<< ", what=" << exception_text
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
catch (const std::exception& ex)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
caught_exception = true;
|
||||||
|
exception_text = ex.what();
|
||||||
|
std::cout << "[PQDIF][ERROR] exception while processing file="
|
||||||
|
<< item.path.string()
|
||||||
|
<< ", what=" << exception_text
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
caught_exception = true;
|
||||||
|
exception_text = "unknown";
|
||||||
|
std::cout << "[PQDIF][ERROR] unknown exception while processing file="
|
||||||
|
<< item.path.string()
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
|
const fs::path target_root = ok ? fs::path(kDoneRootDir) : fs::path(kFailRootDir);
|
||||||
const fs::path dst = target_root / item.mac / item.path.filename();
|
const fs::path dst = target_root / item.mac / item.path.filename();
|
||||||
|
|
||||||
// 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。
|
// 处理完成后移动文件,避免下一轮 scan_once() 重复解析同一个 PQDIF。
|
||||||
// 解析成功:download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd
|
// 解析成功:download/<mac>/<file>.pqd -> download_done/<mac>/<file>.pqd
|
||||||
// 解析失败:download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
|
// 解析失败或解析过程中出现异常:download/<mac>/<file>.pqd -> download_fail/<mac>/<file>.pqd
|
||||||
//
|
//
|
||||||
// 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件,
|
// 调试时如果想让文件保留在 download 目录中、方便反复解析同一个文件,
|
||||||
// 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。
|
// 可以临时注释掉下面这个 if 块;调试结束后建议恢复,否则每一轮都会重复解析。
|
||||||
@@ -7672,6 +8048,13 @@ namespace {
|
|||||||
<< item.path.string() << " -> " << dst.string()
|
<< item.path.string() << " -> " << dst.string()
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
}
|
}
|
||||||
|
else if (caught_exception)
|
||||||
|
{
|
||||||
|
std::cout << "[PQDIF] exception file moved to fail dir: "
|
||||||
|
<< dst.string()
|
||||||
|
<< ", reason=" << exception_text
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit);
|
cleanup_backup_dir(fs::path(kDoneRootDir) / item.mac, kBackupLimit);
|
||||||
cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);
|
cleanup_backup_dir(fs::path(kFailRootDir) / item.mac, kBackupLimit);
|
||||||
@@ -7877,6 +8260,96 @@ void ClearReadyPqdifStatBase64Queue()
|
|||||||
g_pqdif_stat_base64_ready_queue.clear();
|
g_pqdif_stat_base64_ready_queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool GetBase64ByKind(const PqdifStatBase64TimePointPacket& tp, //从序列中获取指定 kind 的 Base64 内容
|
||||||
|
StatValueKind kind,
|
||||||
|
std::string& out)
|
||||||
|
{
|
||||||
|
for (const auto& r : tp.records) {
|
||||||
|
if (r.value_kind == kind) {
|
||||||
|
out = r.base64_payload;
|
||||||
|
return !out.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool extract_monitor_seq_from_local_pqdif_path(const std::string& path,
|
||||||
|
short& point_name)
|
||||||
|
{
|
||||||
|
point_name = 0;
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] begin path="
|
||||||
|
<< path << std::endl;
|
||||||
|
|
||||||
|
// 取纯文件名,例如:
|
||||||
|
// download/192.168.1.10/M1_xxx.pqd
|
||||||
|
// -> M1_xxx.pqd
|
||||||
|
std::string fname = extract_filename1(path);
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] filename="
|
||||||
|
<< fname << std::endl;
|
||||||
|
|
||||||
|
if (fname.size() < 3) {
|
||||||
|
std::cout << "[extract_monitor_seq] filename too short"
|
||||||
|
<< std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fname[0] != 'M' && fname[0] != 'm') {
|
||||||
|
std::cout << "[extract_monitor_seq] filename not start with M/m"
|
||||||
|
<< std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t pos = fname.find('_');
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] underscore pos="
|
||||||
|
<< pos << std::endl;
|
||||||
|
|
||||||
|
if (pos == std::string::npos || pos <= 1) {
|
||||||
|
std::cout << "[extract_monitor_seq] invalid underscore position"
|
||||||
|
<< std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// M1_xxx -> 1
|
||||||
|
std::string seq_str = fname.substr(1, pos - 1);
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] seq_str="
|
||||||
|
<< seq_str << std::endl;
|
||||||
|
|
||||||
|
for (char c : seq_str) {
|
||||||
|
if (!std::isdigit(static_cast<unsigned char>(c))) {
|
||||||
|
std::cout << "[extract_monitor_seq] non-digit char="
|
||||||
|
<< c << std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
point_name = static_cast<short>(std::stoi(seq_str));
|
||||||
|
|
||||||
|
std::cout << "[extract_monitor_seq] success point_name="
|
||||||
|
<< point_name << std::endl;
|
||||||
|
|
||||||
|
return point_name > 0;
|
||||||
|
}
|
||||||
|
catch (const std::exception& e) {
|
||||||
|
std::cout << "[extract_monitor_seq] exception="
|
||||||
|
<< e.what() << std::endl;
|
||||||
|
|
||||||
|
point_name = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
std::cout << "[extract_monitor_seq] unknown exception"
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
point_name = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void RunPqdifScanLoop()
|
void RunPqdifScanLoop()
|
||||||
{
|
{
|
||||||
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
|
std::cout << "[PQDIF] scan loop started, root=" << kPqdRootDir
|
||||||
@@ -7910,6 +8383,54 @@ void RunPqdifScanLoop()
|
|||||||
if (PopReadyPqdifStatBase64FileBatch(batch)) {
|
if (PopReadyPqdifStatBase64FileBatch(batch)) {
|
||||||
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果
|
// batch 就是一个 PQDIF 文件完整的 Base64 组装结果
|
||||||
// 在此处处理上送逻辑
|
// 在此处处理上送逻辑
|
||||||
|
const std::string& mac = batch.mac;
|
||||||
|
|
||||||
|
short point_name = 0;
|
||||||
|
|
||||||
|
if (!extract_monitor_seq_from_local_pqdif_path(batch.pqdif_file_path, point_name)) {
|
||||||
|
std::cout << "[PQDIF_UPLOAD] failed to extract monitor seq from file="
|
||||||
|
<< batch.pqdif_file_path << std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& tp : batch.time_points) {
|
||||||
|
std::string max_base64;
|
||||||
|
std::string min_base64;
|
||||||
|
std::string avg_base64;
|
||||||
|
std::string p95_base64;
|
||||||
|
|
||||||
|
bool has_max = GetBase64ByKind(tp, StatValueKind::Max, max_base64);
|
||||||
|
bool has_min = GetBase64ByKind(tp, StatValueKind::Min, min_base64);
|
||||||
|
bool has_avg = GetBase64ByKind(tp, StatValueKind::Avg, avg_base64);
|
||||||
|
bool has_p95 = GetBase64ByKind(tp, StatValueKind::P95, p95_base64);
|
||||||
|
|
||||||
|
if (!has_max || !has_min || !has_avg || !has_p95) {
|
||||||
|
std::cout << "[PQDIF_UPLOAD] skip incomplete time point, file="
|
||||||
|
<< batch.pqdif_file_path
|
||||||
|
<< " time=" << tp.timestamp_text
|
||||||
|
<< " has_max=" << has_max
|
||||||
|
<< " has_min=" << has_min
|
||||||
|
<< " has_avg=" << has_avg
|
||||||
|
<< " has_p95=" << has_p95
|
||||||
|
<< std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueue_stat_pq(max_base64,
|
||||||
|
min_base64,
|
||||||
|
avg_base64,
|
||||||
|
p95_base64,
|
||||||
|
tp.timestamp,
|
||||||
|
mac,
|
||||||
|
point_name);
|
||||||
|
|
||||||
|
std::cout << "[PQDIF_UPLOAD] enqueue_stat_pq ok, file="
|
||||||
|
<< batch.pqdif_file_path
|
||||||
|
<< " time=" << tp.timestamp_text
|
||||||
|
<< " mac=" << mac
|
||||||
|
<< " point=" << point_name
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const std::exception& ex)
|
catch (const std::exception& ex)
|
||||||
|
|||||||
Reference in New Issue
Block a user