修改编译脚本适配pqdif,同步61850日志上送逻辑
This commit is contained in:
BIN
LFtid1056.rar
BIN
LFtid1056.rar
Binary file not shown.
@@ -20,9 +20,10 @@ $SRC_DIR/tinyxml2.cpp \
|
|||||||
./dealMsg.cpp \
|
./dealMsg.cpp \
|
||||||
./main_thread.cpp \
|
./main_thread.cpp \
|
||||||
./PQSMsg.cpp \
|
./PQSMsg.cpp \
|
||||||
./pqdif_thread_processor.cpp \
|
|
||||||
./pqdif/PQDIF.cpp \
|
./pqdif/PQDIF.cpp \
|
||||||
./pqdif/include/cjson.c "
|
./pqdif_semantic_ids.cpp \
|
||||||
|
./pqdif_thread_processor.cpp \
|
||||||
|
./pqdif/include/cjson.c"
|
||||||
|
|
||||||
INCLUDE_DIRS="-I$SRC_DIR \
|
INCLUDE_DIRS="-I$SRC_DIR \
|
||||||
-I$SRC_DIR/nlohmann \
|
-I$SRC_DIR/nlohmann \
|
||||||
@@ -33,20 +34,22 @@ INCLUDE_DIRS="-I$SRC_DIR \
|
|||||||
-I./lib/libuv-v1.51.0/include \
|
-I./lib/libuv-v1.51.0/include \
|
||||||
-I./pqdif \
|
-I./pqdif \
|
||||||
-I./pqdif/include \
|
-I./pqdif/include \
|
||||||
-I. "
|
-I. "
|
||||||
|
|
||||||
LIB_DIRS="-L$LIB_DIR -L/usr/lib64 -L/usr/local/lib"
|
LIB_DIRS="-L$LIB_DIR -L./pqdif/lib -L/usr/lib64 -L/usr/local/lib"
|
||||||
|
|
||||||
LIBS="./cloudfront/lib/libcurl.so \
|
LIBS="./cloudfront/lib/libcurl.so \
|
||||||
./cloudfront/lib/libssl.so \
|
./cloudfront/lib/libssl.so \
|
||||||
./cloudfront/lib/libcrypto.so \
|
./cloudfront/lib/libcrypto.so \
|
||||||
./cloudfront/lib/liblog4cplus.so \
|
./cloudfront/lib/liblog4cplus.so \
|
||||||
./pqdif/lib/libpqdiflib.a \
|
|
||||||
./pqdif/lib/libz.a \
|
|
||||||
-lpthread -ldl -lrt \
|
-lpthread -ldl -lrt \
|
||||||
-lstdc++fs \
|
-lstdc++fs \
|
||||||
-lz \
|
-lz \
|
||||||
./libuv.a \
|
./libuv.a \
|
||||||
|
-Wl,--start-group \
|
||||||
|
./pqdif/lib/libpqdiflib.a \
|
||||||
|
./pqdif/lib/libz.a \
|
||||||
|
-Wl,--end-group \
|
||||||
-pthread"
|
-pthread"
|
||||||
|
|
||||||
# 如果有静态 rocketmq 库就加上
|
# 如果有静态 rocketmq 库就加上
|
||||||
@@ -74,4 +77,4 @@ if [ $? -eq 0 ]; then
|
|||||||
ldd "$OUT_DIR/$TARGET" || echo "是静态编译程序 ✔"
|
ldd "$OUT_DIR/$TARGET" || echo "是静态编译程序 ✔"
|
||||||
else
|
else
|
||||||
echo "❌ 编译失败"
|
echo "❌ 编译失败"
|
||||||
fi
|
fi
|
||||||
|
|||||||
@@ -236,6 +236,14 @@ int TEST_PORT = 11000; //用于当前进程登录测试shell的端口
|
|||||||
std::string G_TEST_LIST = ""; //测试用的发送实际数据的终端列表
|
std::string G_TEST_LIST = ""; //测试用的发送实际数据的终端列表
|
||||||
std::vector<std::string> TESTARRAY; //解析的列表数组
|
std::vector<std::string> TESTARRAY; //解析的列表数组
|
||||||
|
|
||||||
|
// 日志限流配置
|
||||||
|
int G_LOG_RATE_RESET_SEC = 3600; // 1小时重置
|
||||||
|
int G_LOG_RATE_LIMIT_SEC = 60; // 进入限流后:60秒1条
|
||||||
|
int G_LOG_RATE_KEEP_ALL_MS = 60000; // 间隔 >= 60000ms,全部保留
|
||||||
|
int G_LOG_RATE_KEEP_BURST_MS = 1000; // 间隔 >= 1000ms,按二级策略
|
||||||
|
int G_LOG_RATE_KEEP_BURST_COUNT = 60; // 二级保留前60条
|
||||||
|
int G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10; // 高频保留前10条
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////其他文件定义的函数引用声明
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////其他文件定义的函数引用声明
|
||||||
|
|
||||||
bool enqueue_direct_download(const std::string& dev_id,
|
bool enqueue_direct_download(const std::string& dev_id,
|
||||||
@@ -413,6 +421,37 @@ void loadConfig(const std::string& filename) {
|
|||||||
intMap["RocketMq.TestPort"] = &TEST_PORT;
|
intMap["RocketMq.TestPort"] = &TEST_PORT;
|
||||||
strMap["RocketMq.TestList"] = &G_TEST_LIST;
|
strMap["RocketMq.TestList"] = &G_TEST_LIST;
|
||||||
|
|
||||||
|
// ==================== 新增:日志限流配置 ====================
|
||||||
|
intMap["LogRate.ResetSec"] = &G_LOG_RATE_RESET_SEC;
|
||||||
|
intMap["LogRate.LimitSec"] = &G_LOG_RATE_LIMIT_SEC;
|
||||||
|
intMap["LogRate.KeepAllMs"] = &G_LOG_RATE_KEEP_ALL_MS;
|
||||||
|
intMap["LogRate.KeepBurstMs"] = &G_LOG_RATE_KEEP_BURST_MS;
|
||||||
|
intMap["LogRate.KeepBurstCount"] = &G_LOG_RATE_KEEP_BURST_COUNT;
|
||||||
|
intMap["LogRate.KeepHighFreqCount"] = &G_LOG_RATE_KEEP_HIGHFREQ_COUNT;
|
||||||
|
// ==========================================================
|
||||||
|
|
||||||
|
// ==================== 新增:日志限流默认值保护 ====================
|
||||||
|
|
||||||
|
if (G_LOG_RATE_RESET_SEC <= 0)
|
||||||
|
G_LOG_RATE_RESET_SEC = 3600;
|
||||||
|
|
||||||
|
if (G_LOG_RATE_LIMIT_SEC <= 0)
|
||||||
|
G_LOG_RATE_LIMIT_SEC = 60;
|
||||||
|
|
||||||
|
if (G_LOG_RATE_KEEP_ALL_MS <= 0)
|
||||||
|
G_LOG_RATE_KEEP_ALL_MS = 60000;
|
||||||
|
|
||||||
|
if (G_LOG_RATE_KEEP_BURST_MS <= 0)
|
||||||
|
G_LOG_RATE_KEEP_BURST_MS = 1000;
|
||||||
|
|
||||||
|
if (G_LOG_RATE_KEEP_BURST_COUNT < 0)
|
||||||
|
G_LOG_RATE_KEEP_BURST_COUNT = 60;
|
||||||
|
|
||||||
|
if (G_LOG_RATE_KEEP_HIGHFREQ_COUNT < 0)
|
||||||
|
G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10;
|
||||||
|
|
||||||
|
// ================================================================
|
||||||
|
|
||||||
// 2. 打开并逐行解析 INI 文件
|
// 2. 打开并逐行解析 INI 文件
|
||||||
std::ifstream fin(filename);
|
std::ifstream fin(filename);
|
||||||
if (!fin.is_open()) {
|
if (!fin.is_open()) {
|
||||||
@@ -585,6 +624,15 @@ void printConfig() {
|
|||||||
printStr("WEB_FILEUPLOAD", WEB_FILEUPLOAD);
|
printStr("WEB_FILEUPLOAD", WEB_FILEUPLOAD);
|
||||||
printStr("WEB_FILEDOWNLOAD", WEB_FILEDOWNLOAD);
|
printStr("WEB_FILEDOWNLOAD", WEB_FILEDOWNLOAD);
|
||||||
|
|
||||||
|
std::cout << "\n// 日志限流配置\n";
|
||||||
|
|
||||||
|
printInt("G_LOG_RATE_RESET_SEC", G_LOG_RATE_RESET_SEC);
|
||||||
|
printInt("G_LOG_RATE_LIMIT_SEC", G_LOG_RATE_LIMIT_SEC);
|
||||||
|
printInt("G_LOG_RATE_KEEP_ALL_MS", G_LOG_RATE_KEEP_ALL_MS);
|
||||||
|
printInt("G_LOG_RATE_KEEP_BURST_MS", G_LOG_RATE_KEEP_BURST_MS);
|
||||||
|
printInt("G_LOG_RATE_KEEP_BURST_COUNT", G_LOG_RATE_KEEP_BURST_COUNT);
|
||||||
|
printInt("G_LOG_RATE_KEEP_HIGHFREQ_COUNT",G_LOG_RATE_KEEP_HIGHFREQ_COUNT);
|
||||||
|
|
||||||
std::cout << "-------------------------------------\n";
|
std::cout << "-------------------------------------\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -50,6 +50,14 @@ extern std::string subdir;
|
|||||||
|
|
||||||
//日志主题
|
//日志主题
|
||||||
extern std::string G_LOG_TOPIC;
|
extern std::string G_LOG_TOPIC;
|
||||||
|
|
||||||
|
// 日志限流配置
|
||||||
|
extern int G_LOG_RATE_RESET_SEC;
|
||||||
|
extern int G_LOG_RATE_LIMIT_SEC;
|
||||||
|
extern int G_LOG_RATE_KEEP_ALL_MS;
|
||||||
|
extern int G_LOG_RATE_KEEP_BURST_MS;
|
||||||
|
extern int G_LOG_RATE_KEEP_BURST_COUNT;
|
||||||
|
extern int G_LOG_RATE_KEEP_HIGHFREQ_COUNT;
|
||||||
////////////////////////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
const int LOGTYPE_DEFAULT = LOG_CODE_OTHER;
|
const int LOGTYPE_DEFAULT = LOG_CODE_OTHER;
|
||||||
|
|
||||||
@@ -223,10 +231,26 @@ void refresh_log_level_cache_locked()
|
|||||||
|
|
||||||
class SendAppender : public Appender {
|
class SendAppender : public Appender {
|
||||||
private:
|
private:
|
||||||
struct RateState {
|
/*struct RateState {
|
||||||
uint64_t hit_count = 0; // 同一条日志累计命中次数
|
uint64_t hit_count = 0; // 同一条日志累计命中次数
|
||||||
std::chrono::steady_clock::time_point last_emit =
|
std::chrono::steady_clock::time_point last_emit =
|
||||||
std::chrono::steady_clock::time_point::min();
|
std::chrono::steady_clock::time_point::min();
|
||||||
|
};*/
|
||||||
|
struct RateState {
|
||||||
|
uint64_t pass_count; // 当前周期内已放行条数
|
||||||
|
uint64_t suppressed_count; // 当前被抑制条数
|
||||||
|
std::chrono::steady_clock::time_point last_emit;
|
||||||
|
std::chrono::steady_clock::time_point last_seen;
|
||||||
|
std::chrono::steady_clock::time_point last_reset;
|
||||||
|
bool has_emit;
|
||||||
|
|
||||||
|
RateState()
|
||||||
|
: pass_count(0),
|
||||||
|
suppressed_count(0),
|
||||||
|
last_emit(),
|
||||||
|
last_seen(),
|
||||||
|
last_reset(),
|
||||||
|
has_emit(false) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
static std::unordered_map<std::string, RateState> s_rate_map; //频率map
|
static std::unordered_map<std::string, RateState> s_rate_map; //频率map
|
||||||
@@ -240,7 +264,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 前 3 次:1 秒一次;第 3 次起:300 秒一次,一小时恢复
|
// 前 3 次:1 秒一次;第 3 次起:300 秒一次,一小时恢复
|
||||||
static bool should_emit(const std::string& key) {
|
/*static bool should_emit(const std::string& key) {
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
const auto now = steady_clock::now();
|
const auto now = steady_clock::now();
|
||||||
|
|
||||||
@@ -271,6 +295,96 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}*/
|
||||||
|
static bool should_emit(const std::string& key, uint64_t& suppressed_before_emit) {
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
const auto now = steady_clock::now();
|
||||||
|
suppressed_before_emit = 0;
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lk(s_rate_mutex);
|
||||||
|
RateState& st = s_rate_map[key];
|
||||||
|
|
||||||
|
const int RESET_SEC = G_LOG_RATE_RESET_SEC ; // 1小时重置
|
||||||
|
const int LIMIT_SEC = G_LOG_RATE_LIMIT_SEC ; // 进入限流后:多久发1条 主要控制中频和高频那些,低频的都直接放行了
|
||||||
|
|
||||||
|
// 初始化 / 强制每小时重置
|
||||||
|
if (st.last_reset.time_since_epoch().count() == 0) {
|
||||||
|
st.last_reset = now;
|
||||||
|
} else {
|
||||||
|
auto since_reset = duration_cast<seconds>(now - st.last_reset).count();
|
||||||
|
if (since_reset >= RESET_SEC) { //重置周期
|
||||||
|
st = RateState();
|
||||||
|
st.last_reset = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 计算当前频率档位(按“本次与上次看到该key的间隔”判断)
|
||||||
|
// >=60秒/条:全部保留
|
||||||
|
// [1秒, 60秒):保留前60条,然后1分钟1条
|
||||||
|
// <1秒:保留前10条,然后1分钟1条
|
||||||
|
int allow_burst = 0;
|
||||||
|
|
||||||
|
if (st.last_seen.time_since_epoch().count() == 0) {
|
||||||
|
// 第一次看到,先按“全部保留”处理
|
||||||
|
allow_burst = -1;
|
||||||
|
} else {
|
||||||
|
auto gap_ms = duration_cast<milliseconds>(now - st.last_seen).count();
|
||||||
|
|
||||||
|
if (gap_ms >= G_LOG_RATE_KEEP_ALL_MS) { //什么时候不需要限流 //低频 //如果这里设置的很低,就不会限流
|
||||||
|
allow_burst = -1; // 全部保留
|
||||||
|
} else if (gap_ms >= G_LOG_RATE_KEEP_BURST_MS) {
|
||||||
|
allow_burst = G_LOG_RATE_KEEP_BURST_COUNT; // 前60条 //中频 //如果这里设置的比低频低,也不会生效
|
||||||
|
} else {
|
||||||
|
allow_burst = G_LOG_RATE_KEEP_HIGHFREQ_COUNT; // 前10条 //高频
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
st.last_seen = now;
|
||||||
|
|
||||||
|
// 档位1:全部保留
|
||||||
|
if (allow_burst == -1) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 档位2/3:先放前N条
|
||||||
|
if (st.pass_count < (uint64_t)allow_burst) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 超过前N条后:进入 1分钟1条
|
||||||
|
if (!st.has_emit) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto elapsed = duration_cast<seconds>(now - st.last_emit).count();
|
||||||
|
if (elapsed >= LIMIT_SEC) {
|
||||||
|
suppressed_before_emit = st.suppressed_count;
|
||||||
|
st.suppressed_count = 0;
|
||||||
|
st.pass_count++;
|
||||||
|
st.last_emit = now;
|
||||||
|
st.has_emit = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 本条被抑制
|
||||||
|
st.suppressed_count++;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -396,8 +510,17 @@ protected:
|
|||||||
|
|
||||||
// ★新增:限频判断(同一条日志前 5 次 1 秒一次;之后 300 秒一次)
|
// ★新增:限频判断(同一条日志前 5 次 1 秒一次;之后 300 秒一次)
|
||||||
const std::string key = make_key(logger_name, level, code, msg);
|
const std::string key = make_key(logger_name, level, code, msg);
|
||||||
if (!should_emit(key)) {
|
uint64_t suppressed_before_emit = 0;
|
||||||
return;
|
if (!should_emit(key, suppressed_before_emit)) return;
|
||||||
|
|
||||||
|
// 如果本次输出前压掉过日志,则在 log 文本后追加统计
|
||||||
|
std::string final_msg = msg;
|
||||||
|
if (suppressed_before_emit > 0) {
|
||||||
|
std::ostringstream suppressed_oss;
|
||||||
|
suppressed_oss << msg << " 【已过滤重复同类日志 "
|
||||||
|
<< suppressed_before_emit
|
||||||
|
<< " 条】";
|
||||||
|
final_msg = suppressed_oss.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
@@ -409,7 +532,7 @@ protected:
|
|||||||
<< "\",\"grade\":\"" << get_level_str(level)
|
<< "\",\"grade\":\"" << get_level_str(level)
|
||||||
// ★建议:code 用数字(不是字符串)
|
// ★建议:code 用数字(不是字符串)
|
||||||
<< "\",\"code\":" << code
|
<< "\",\"code\":" << code
|
||||||
<< ",\"log\":\"" << escape_json(msg) << "\"}";
|
<< ",\"log\":\"" << escape_json(final_msg) << "\"}";
|
||||||
|
|
||||||
queue_data_t connect_info;
|
queue_data_t connect_info;
|
||||||
connect_info.strTopic = G_LOG_TOPIC;
|
connect_info.strTopic = G_LOG_TOPIC;
|
||||||
|
|||||||
@@ -244,6 +244,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
//装置主动上送报文 暂态事件报文/暂态波形文件报文
|
//装置主动上送报文 暂态事件报文/暂态波形文件报文
|
||||||
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_Event)) {
|
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_Event)) {
|
||||||
//处理主动上送的暂态事件报文
|
//处理主动上送的暂态事件报文
|
||||||
|
std::cout << "GET: MsgResponseType::Response_Event";
|
||||||
|
DIY_INFOLOG_CODE(id, 1, static_cast<int>(LogCode::LOG_CODE_TRANSIENT), "收到装置主动上送的暂态事件信息报文");
|
||||||
NewTaglogbuffer event = NewTaglogbuffer::createFromData(parser.RecvData.data(), parser.RecvData.size());
|
NewTaglogbuffer event = NewTaglogbuffer::createFromData(parser.RecvData.data(), parser.RecvData.size());
|
||||||
|
|
||||||
//获取测点id
|
//获取测点id
|
||||||
@@ -365,6 +367,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
|
|||||||
}
|
}
|
||||||
else if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_ActiveSOEInfo)) {
|
else if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_ActiveSOEInfo)) {
|
||||||
//处理主动上送的波形文件信息报文
|
//处理主动上送的波形文件信息报文
|
||||||
|
std::cout << "GET: MsgResponseType::Response_ActiveSOEInfo";
|
||||||
|
DIY_INFOLOG_CODE(id, 1, static_cast<int>(LogCode::LOG_CODE_TRANSIENT), "收到装置主动上送的暂态波形文件信息报文");
|
||||||
unsigned char file_type = udata[12];//录波文件类型数 cfg dat hdr 1-3
|
unsigned char file_type = udata[12];//录波文件类型数 cfg dat hdr 1-3
|
||||||
unsigned char line_id = udata[13];//录波测点 1-6
|
unsigned char line_id = udata[13];//录波测点 1-6
|
||||||
const uint8_t* data_ptr = parser.RecvData.data() + 2;//数据体去除前两位
|
const uint8_t* data_ptr = parser.RecvData.data() + 2;//数据体去除前两位
|
||||||
|
|||||||
Reference in New Issue
Block a user