diff --git a/LFtid1056.rar b/LFtid1056.rar index f015249..3fe13a6 100644 Binary files a/LFtid1056.rar and b/LFtid1056.rar differ diff --git a/LFtid1056/build.sh b/LFtid1056/build.sh index 743a70a..0be6ee9 100644 --- a/LFtid1056/build.sh +++ b/LFtid1056/build.sh @@ -20,9 +20,10 @@ $SRC_DIR/tinyxml2.cpp \ ./dealMsg.cpp \ ./main_thread.cpp \ ./PQSMsg.cpp \ -./pqdif_thread_processor.cpp \ ./pqdif/PQDIF.cpp \ -./pqdif/include/cjson.c " +./pqdif_semantic_ids.cpp \ +./pqdif_thread_processor.cpp \ +./pqdif/include/cjson.c" INCLUDE_DIRS="-I$SRC_DIR \ -I$SRC_DIR/nlohmann \ @@ -33,20 +34,22 @@ INCLUDE_DIRS="-I$SRC_DIR \ -I./lib/libuv-v1.51.0/include \ -I./pqdif \ -I./pqdif/include \ --I. " +-I. " -LIB_DIRS="-L$LIB_DIR -L/usr/lib64 -L/usr/local/lib" +LIB_DIRS="-L$LIB_DIR -L./pqdif/lib -L/usr/lib64 -L/usr/local/lib" LIBS="./cloudfront/lib/libcurl.so \ ./cloudfront/lib/libssl.so \ ./cloudfront/lib/libcrypto.so \ ./cloudfront/lib/liblog4cplus.so \ -./pqdif/lib/libpqdiflib.a \ -./pqdif/lib/libz.a \ -lpthread -ldl -lrt \ -lstdc++fs \ -lz \ ./libuv.a \ +-Wl,--start-group \ +./pqdif/lib/libpqdiflib.a \ +./pqdif/lib/libz.a \ +-Wl,--end-group \ -pthread" # 如果有静态 rocketmq 库就加上 @@ -74,4 +77,4 @@ if [ $? -eq 0 ]; then ldd "$OUT_DIR/$TARGET" || echo "是静态编译程序 ✔" else echo "❌ 编译失败" -fi \ No newline at end of file +fi diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 25e959e..606058f 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -236,6 +236,14 @@ int TEST_PORT = 11000; //用于当前进程登录测试shell的端口 std::string G_TEST_LIST = ""; //测试用的发送实际数据的终端列表 std::vector TESTARRAY; //解析的列表数组 +// 日志限流配置 +int G_LOG_RATE_RESET_SEC = 3600; // 1小时重置 +int G_LOG_RATE_LIMIT_SEC = 60; // 进入限流后:60秒1条 +int G_LOG_RATE_KEEP_ALL_MS = 60000; // 间隔 >= 60000ms,全部保留 +int G_LOG_RATE_KEEP_BURST_MS = 1000; // 间隔 >= 1000ms,按二级策略 +int G_LOG_RATE_KEEP_BURST_COUNT = 60; // 二级保留前60条 +int G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10; // 高频保留前10条 + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////其他文件定义的函数引用声明 bool enqueue_direct_download(const std::string& dev_id, @@ -413,6 +421,37 @@ void loadConfig(const std::string& filename) { intMap["RocketMq.TestPort"] = &TEST_PORT; strMap["RocketMq.TestList"] = &G_TEST_LIST; + // ==================== 新增:日志限流配置 ==================== + intMap["LogRate.ResetSec"] = &G_LOG_RATE_RESET_SEC; + intMap["LogRate.LimitSec"] = &G_LOG_RATE_LIMIT_SEC; + intMap["LogRate.KeepAllMs"] = &G_LOG_RATE_KEEP_ALL_MS; + intMap["LogRate.KeepBurstMs"] = &G_LOG_RATE_KEEP_BURST_MS; + intMap["LogRate.KeepBurstCount"] = &G_LOG_RATE_KEEP_BURST_COUNT; + intMap["LogRate.KeepHighFreqCount"] = &G_LOG_RATE_KEEP_HIGHFREQ_COUNT; + // ========================================================== + + // ==================== 新增:日志限流默认值保护 ==================== + + if (G_LOG_RATE_RESET_SEC <= 0) + G_LOG_RATE_RESET_SEC = 3600; + + if (G_LOG_RATE_LIMIT_SEC <= 0) + G_LOG_RATE_LIMIT_SEC = 60; + + if (G_LOG_RATE_KEEP_ALL_MS <= 0) + G_LOG_RATE_KEEP_ALL_MS = 60000; + + if (G_LOG_RATE_KEEP_BURST_MS <= 0) + G_LOG_RATE_KEEP_BURST_MS = 1000; + + if (G_LOG_RATE_KEEP_BURST_COUNT < 0) + G_LOG_RATE_KEEP_BURST_COUNT = 60; + + if (G_LOG_RATE_KEEP_HIGHFREQ_COUNT < 0) + G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10; + + // ================================================================ + // 2. 打开并逐行解析 INI 文件 std::ifstream fin(filename); if (!fin.is_open()) { @@ -585,6 +624,15 @@ void printConfig() { printStr("WEB_FILEUPLOAD", WEB_FILEUPLOAD); printStr("WEB_FILEDOWNLOAD", WEB_FILEDOWNLOAD); + std::cout << "\n// 日志限流配置\n"; + + printInt("G_LOG_RATE_RESET_SEC", G_LOG_RATE_RESET_SEC); + printInt("G_LOG_RATE_LIMIT_SEC", G_LOG_RATE_LIMIT_SEC); + printInt("G_LOG_RATE_KEEP_ALL_MS", G_LOG_RATE_KEEP_ALL_MS); + printInt("G_LOG_RATE_KEEP_BURST_MS", G_LOG_RATE_KEEP_BURST_MS); + printInt("G_LOG_RATE_KEEP_BURST_COUNT", G_LOG_RATE_KEEP_BURST_COUNT); + printInt("G_LOG_RATE_KEEP_HIGHFREQ_COUNT",G_LOG_RATE_KEEP_HIGHFREQ_COUNT); + std::cout << "-------------------------------------\n"; } diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index e42ac67..5e80622 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -50,6 +50,14 @@ extern std::string subdir; //日志主题 extern std::string G_LOG_TOPIC; + +// 日志限流配置 +extern int G_LOG_RATE_RESET_SEC; +extern int G_LOG_RATE_LIMIT_SEC; +extern int G_LOG_RATE_KEEP_ALL_MS; +extern int G_LOG_RATE_KEEP_BURST_MS; +extern int G_LOG_RATE_KEEP_BURST_COUNT; +extern int G_LOG_RATE_KEEP_HIGHFREQ_COUNT; //////////////////////////////////////////////////////////////////////////////////////////////////// const int LOGTYPE_DEFAULT = LOG_CODE_OTHER; @@ -223,10 +231,26 @@ void refresh_log_level_cache_locked() class SendAppender : public Appender { private: - struct RateState { + /*struct RateState { uint64_t hit_count = 0; // 同一条日志累计命中次数 std::chrono::steady_clock::time_point last_emit = std::chrono::steady_clock::time_point::min(); + };*/ + struct RateState { + uint64_t pass_count; // 当前周期内已放行条数 + uint64_t suppressed_count; // 当前被抑制条数 + std::chrono::steady_clock::time_point last_emit; + std::chrono::steady_clock::time_point last_seen; + std::chrono::steady_clock::time_point last_reset; + bool has_emit; + + RateState() + : pass_count(0), + suppressed_count(0), + last_emit(), + last_seen(), + last_reset(), + has_emit(false) {} }; static std::unordered_map s_rate_map; //频率map @@ -240,7 +264,7 @@ private: } // 前 3 次:1 秒一次;第 3 次起:300 秒一次,一小时恢复 - static bool should_emit(const std::string& key) { + /*static bool should_emit(const std::string& key) { using namespace std::chrono; const auto now = steady_clock::now(); @@ -271,6 +295,96 @@ private: return true; } + return false; + }*/ + static bool should_emit(const std::string& key, uint64_t& suppressed_before_emit) { + using namespace std::chrono; + + const auto now = steady_clock::now(); + suppressed_before_emit = 0; + + std::lock_guard lk(s_rate_mutex); + RateState& st = s_rate_map[key]; + + const int RESET_SEC = G_LOG_RATE_RESET_SEC ; // 1小时重置 + const int LIMIT_SEC = G_LOG_RATE_LIMIT_SEC ; // 进入限流后:多久发1条 主要控制中频和高频那些,低频的都直接放行了 + + // 初始化 / 强制每小时重置 + if (st.last_reset.time_since_epoch().count() == 0) { + st.last_reset = now; + } else { + auto since_reset = duration_cast(now - st.last_reset).count(); + if (since_reset >= RESET_SEC) { //重置周期 + st = RateState(); + st.last_reset = now; + } + } + + // 计算当前频率档位(按“本次与上次看到该key的间隔”判断) + // >=60秒/条:全部保留 + // [1秒, 60秒):保留前60条,然后1分钟1条 + // <1秒:保留前10条,然后1分钟1条 + int allow_burst = 0; + + if (st.last_seen.time_since_epoch().count() == 0) { + // 第一次看到,先按“全部保留”处理 + allow_burst = -1; + } else { + auto gap_ms = duration_cast(now - st.last_seen).count(); + + if (gap_ms >= G_LOG_RATE_KEEP_ALL_MS) { //什么时候不需要限流 //低频 //如果这里设置的很低,就不会限流 + allow_burst = -1; // 全部保留 + } else if (gap_ms >= G_LOG_RATE_KEEP_BURST_MS) { + allow_burst = G_LOG_RATE_KEEP_BURST_COUNT; // 前60条 //中频 //如果这里设置的比低频低,也不会生效 + } else { + allow_burst = G_LOG_RATE_KEEP_HIGHFREQ_COUNT; // 前10条 //高频 + } + } + + st.last_seen = now; + + // 档位1:全部保留 + if (allow_burst == -1) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; + st.last_emit = now; + st.has_emit = true; + return true; + } + + // 档位2/3:先放前N条 + if (st.pass_count < (uint64_t)allow_burst) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; + st.last_emit = now; + st.has_emit = true; + return true; + } + + // 超过前N条后:进入 1分钟1条 + if (!st.has_emit) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; + st.last_emit = now; + st.has_emit = true; + return true; + } + + auto elapsed = duration_cast(now - st.last_emit).count(); + if (elapsed >= LIMIT_SEC) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; + st.last_emit = now; + st.has_emit = true; + return true; + } + + // 本条被抑制 + st.suppressed_count++; return false; } @@ -396,8 +510,17 @@ protected: // ★新增:限频判断(同一条日志前 5 次 1 秒一次;之后 300 秒一次) const std::string key = make_key(logger_name, level, code, msg); - if (!should_emit(key)) { - return; + uint64_t suppressed_before_emit = 0; + if (!should_emit(key, suppressed_before_emit)) return; + + // 如果本次输出前压掉过日志,则在 log 文本后追加统计 + std::string final_msg = msg; + if (suppressed_before_emit > 0) { + std::ostringstream suppressed_oss; + suppressed_oss << msg << " 【已过滤重复同类日志 " + << suppressed_before_emit + << " 条】"; + final_msg = suppressed_oss.str(); } std::ostringstream oss; @@ -409,7 +532,7 @@ protected: << "\",\"grade\":\"" << get_level_str(level) // ★建议:code 用数字(不是字符串) << "\",\"code\":" << code - << ",\"log\":\"" << escape_json(msg) << "\"}"; + << ",\"log\":\"" << escape_json(final_msg) << "\"}"; queue_data_t connect_info; connect_info.strTopic = G_LOG_TOPIC; diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 109cc80..f72fcab 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -244,6 +244,8 @@ void process_received_message(string mac, string id,const char* data, size_t len //装置主动上送报文 暂态事件报文/暂态波形文件报文 if (udata[8] == static_cast(MsgResponseType::Response_Event)) { //处理主动上送的暂态事件报文 + std::cout << "GET: MsgResponseType::Response_Event"; + DIY_INFOLOG_CODE(id, 1, static_cast(LogCode::LOG_CODE_TRANSIENT), "收到装置主动上送的暂态事件信息报文"); NewTaglogbuffer event = NewTaglogbuffer::createFromData(parser.RecvData.data(), parser.RecvData.size()); //获取测点id @@ -365,6 +367,8 @@ void process_received_message(string mac, string id,const char* data, size_t len } else if (udata[8] == static_cast(MsgResponseType::Response_ActiveSOEInfo)) { //处理主动上送的波形文件信息报文 + std::cout << "GET: MsgResponseType::Response_ActiveSOEInfo"; + DIY_INFOLOG_CODE(id, 1, static_cast(LogCode::LOG_CODE_TRANSIENT), "收到装置主动上送的暂态波形文件信息报文"); unsigned char file_type = udata[12];//录波文件类型数 cfg dat hdr 1-3 unsigned char line_id = udata[13];//录波测点 1-6 const uint8_t* data_ptr = parser.RecvData.data() + 2;//数据体去除前两位