diff --git a/cfg_parse/SimpleProducer.cpp b/cfg_parse/SimpleProducer.cpp index c9f7014..2650b9c 100644 --- a/cfg_parse/SimpleProducer.cpp +++ b/cfg_parse/SimpleProducer.cpp @@ -54,6 +54,7 @@ extern std::string G_MQCONSUMER_TOPIC_UD; extern std::string G_MQCONSUMER_TOPIC_RT; extern std::string FRONT_INST; +extern bool DEBUGOPEN; #ifdef __cplusplus extern "C" { @@ -417,6 +418,9 @@ public: // 设置 nameserver 地址 SetProducerNameServerAddress(producer_, nameServer.c_str()); + //lnk20260417设置数据上送消息体最大值,默认4M,调整为1M,避免过大消息导致发送失败 + SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB + SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), ""); // 启动生产者 @@ -457,6 +461,64 @@ public: // 发送消息 void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) { + + if (DEBUGOPEN) { + std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL") + << ", tags: " << tags + << ", keys: " << keys + << std::endl; + + if (strbody) { + // ===== 1️⃣ 真实长度 vs strlen ===== + std::string body_str(strbody); + + std::cout << "[MQ][LEN_CHECK]" + << " strlen=" << strlen(strbody) + << ", std::string.size=" << body_str.size() + << std::endl; + + // ===== 2️⃣ 检测是否包含 \0 ===== + bool has_null = false; + for (size_t i = 0; i < body_str.size(); i++) { + if (body_str[i] == '\0') { + has_null = true; + std::cout << "[MQ][FOUND_NULL] index=" << i << std::endl; + break; + } + } + std::cout << "[MQ][HAS_NULL] " << (has_null ? "YES" : "NO") << std::endl; + + // ===== 3️⃣ 打印头部(可读)===== + size_t len = strlen(strbody); + size_t n = std::min((size_t)200, len); + + std::cout << "[MQ][BODY_HEAD] " + << std::string(strbody, n) + << std::endl; + + std::cout << "[MQ][BODY_TAIL] " + << std::string(strbody + (len - n), n) + << std::endl; + + // ===== 4️⃣ 十六进制打印前100字节 ===== + std::cout << "[MQ][HEX_HEAD] "; + for (size_t i = 0; i < std::min((size_t)100, body_str.size()); i++) { + printf("%02X ", (unsigned char)body_str[i]); + } + printf("\n"); + + // ===== 5️⃣ 十六进制打印尾部100字节 ===== + std::cout << "[MQ][HEX_TAIL] "; + size_t start = (body_str.size() > 100) ? body_str.size() - 100 : 0; + for (size_t i = start; i < body_str.size(); i++) { + printf("%02X ", (unsigned char)body_str[i]); + } + printf("\n"); + } else { + std::cout << "[MQ][ERROR] strbody is NULL" << std::endl; + } + } + CSendResult result; CMessage* msg = NULL; @@ -481,6 +543,47 @@ public: RoundRobinSelector, // 队列选择器回调函数 &queueNum // 传递给选择器的额外参数(队列数量) ); + /////////////////////////////////替换接口,性能较低但不影响 + /*CSendResult result; + memset(&result, 0, sizeof(result)); + + int sendResult = SendMessageOrderly( + producer_, + msg, + RoundRobinSelector, + &queueNum, + 0, // autoRetryTimes + &result + ); + + std::cout << "[MQ][ORDERLY_RESULT]" + << " ret=" << sendResult + << ", sendStatus=" << (int)result.sendStatus + << ", msgId=" << result.msgId + << ", offset=" << result.offset + << ", topic=" << (topic ? topic : "") + << ", body_len=" << (strbody ? strlen(strbody) : 0) + << std::endl;*/ + /////////////////////////////////替换接口,性能较低但不影响 + // 发送消息:临时改成同步发送,绕过 orderly / selector,便于定位问题 + /*CSendResult result; + memset(&result, 0, sizeof(result)); + + int sendResult = SendMessageSync( + producer_, + msg, + &result + ); + + std::cout << "[MQ][SYNC_RESULT]" + << " ret=" << sendResult + << ", sendStatus=" << (int)result.sendStatus + << ", msgId=" << result.msgId + << ", offset=" << result.offset + << ", topic=" << (topic ? topic : "") + << ", body_len=" << (strbody ? strlen(strbody) : 0) + << std::endl;*/ + // 发送消息:临时改成同步发送,绕过 orderly / selector,便于定位问题 if (sendResult == 0) { // 假设返回 0 表示成功 std::cout << "[MQ][SEND_OK]" @@ -490,13 +593,17 @@ public: << ", body_len=" << (strbody ? strlen(strbody) : 0) << std::endl; } else { + std::cout << "[MQ][SEND_FAIL]" << " ret=" << sendResult << ", topic=" << (topic ? topic : "") << ", tags=" << tags << ", keys=" << keys + << ", body_len=" << (strbody ? strlen(strbody) : 0) << std::endl; - DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index); + std::cout << "[MQ][BODY_HEAD] " << std::string(strbody, std::min((size_t)200, strlen(strbody))) << std::endl; + std::cout << "[MQ][BODY_TAIL] " << std::string(strbody + std::max((size_t)0, strlen(strbody) - std::min((size_t)200, strlen(strbody)))) << std::endl; + DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index); } // 销毁消息 diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index ab4fd16..71cf023 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -358,6 +358,14 @@ std::string WEB_EVENT = ""; std::string WEB_FILEUPLOAD = ""; std::string WEB_FILEDOWNLOAD = ""; +// 日志限流配置 +int G_LOG_RATE_RESET_SEC = 3600; // 1小时重置 +int G_LOG_RATE_LIMIT_SEC = 60; // 进入限流后:60秒1条 +int G_LOG_RATE_KEEP_ALL_MS = 60000; // 间隔 >= 60000ms,全部保留 +int G_LOG_RATE_KEEP_BURST_MS = 1000; // 间隔 >= 1000ms,按二级策略 +int G_LOG_RATE_KEEP_BURST_COUNT = 60; // 二级保留前60条 +int G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10; // 高频保留前10条 + //lnk20250115添加台账锁 extern pthread_mutex_t mtx; @@ -781,6 +789,29 @@ void init_config() { std::cout << "Read G_TEST_NUM:" << G_TEST_NUM << std::endl; std::cout << "Read G_TEST_TYPE:" << G_TEST_TYPE << std::endl; + + // 日志限流配置 + G_LOG_RATE_RESET_SEC = settings.value("LogRate/ResetSec", 3600).toInt(); + G_LOG_RATE_LIMIT_SEC = settings.value("LogRate/LimitSec", 60).toInt(); + G_LOG_RATE_KEEP_ALL_MS = settings.value("LogRate/KeepAllMs", 60000).toInt(); + G_LOG_RATE_KEEP_BURST_MS = settings.value("LogRate/KeepBurstMs", 1000).toInt(); + G_LOG_RATE_KEEP_BURST_COUNT = settings.value("LogRate/KeepBurstCount", 60).toInt(); + G_LOG_RATE_KEEP_HIGHFREQ_COUNT = settings.value("LogRate/KeepHighFreqCount", 10).toInt(); + + std::cout << "Read G_LOG_RATE_RESET_SEC:" << G_LOG_RATE_RESET_SEC << std::endl; + std::cout << "Read G_LOG_RATE_LIMIT_SEC:" << G_LOG_RATE_LIMIT_SEC << std::endl; + std::cout << "Read G_LOG_RATE_KEEP_ALL_MS:" << G_LOG_RATE_KEEP_ALL_MS << std::endl; + std::cout << "Read G_LOG_RATE_KEEP_BURST_MS:" << G_LOG_RATE_KEEP_BURST_MS << std::endl; + std::cout << "Read G_LOG_RATE_KEEP_BURST_COUNT:" << G_LOG_RATE_KEEP_BURST_COUNT << std::endl; + std::cout << "Read G_LOG_RATE_KEEP_HIGHFREQ_COUNT:" << G_LOG_RATE_KEEP_HIGHFREQ_COUNT << std::endl; + + if (G_LOG_RATE_RESET_SEC <= 0) G_LOG_RATE_RESET_SEC = 3600; + if (G_LOG_RATE_LIMIT_SEC <= 0) G_LOG_RATE_LIMIT_SEC = 60; + if (G_LOG_RATE_KEEP_ALL_MS <= 0) G_LOG_RATE_KEEP_ALL_MS = 60000; + if (G_LOG_RATE_KEEP_BURST_MS <= 0) G_LOG_RATE_KEEP_BURST_MS = 1000; + if (G_LOG_RATE_KEEP_BURST_COUNT < 0) G_LOG_RATE_KEEP_BURST_COUNT = 60; + if (G_LOG_RATE_KEEP_HIGHFREQ_COUNT < 0) G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10; + //20241212lnk添加多前置 if (g_front_seg_index != 0 && g_front_seg_num != 0) { MULTIPLE_NODE_FLAG = 1; @@ -6542,6 +6573,9 @@ void rocketmq_test_300(int mpnum,int front_index,int type) { // 循环发送 300 条消息 if(type == 0){ std::cout << " use ledger send msg " << std::endl; + + pthread_mutex_lock(&mtx); + for (int i = 0; (total_messages > 0 && g_node_id == 100) && i < g_node->n_clients; ++i) {//台账模拟不限制进程号 ied = (ied_t*)g_node->clients[i]; @@ -6611,6 +6645,13 @@ void rocketmq_test_300(int mpnum,int front_index,int type) { } } } + + + + std::cout << "Finished sending " << g_node->n_clients << " messages." << std::endl; + + pthread_mutex_unlock(&mtx); + } else{ std::cout << " use monitor + number send msg " << std::endl; @@ -6663,9 +6704,10 @@ void rocketmq_test_300(int mpnum,int front_index,int type) { apr_sleep(apr_time_from_msec(60000/total_messages)); // 添加毫秒延时 }*/ } - } - std::cout << "Finished sending " << total_messages << " messages." << std::endl; + std::cout << "Finished sending " << total_messages << " messages." << std::endl; + } + } } diff --git a/cfg_parse/log4.cpp b/cfg_parse/log4.cpp index be0d746..b77f519 100644 --- a/cfg_parse/log4.cpp +++ b/cfg_parse/log4.cpp @@ -68,6 +68,15 @@ extern std::string intToString(int number); //日志主题 extern std::string G_LOG_TOPIC; + +// 日志限流配置 +extern int G_LOG_RATE_RESET_SEC; +extern int G_LOG_RATE_LIMIT_SEC; +extern int G_LOG_RATE_KEEP_ALL_MS; +extern int G_LOG_RATE_KEEP_BURST_MS; +extern int G_LOG_RATE_KEEP_BURST_COUNT; +extern int G_LOG_RATE_KEEP_HIGHFREQ_COUNT; + ///////////////////////////////////////////////////////// //log4命名空间 using namespace log4cplus; @@ -337,7 +346,19 @@ protected: // ③ 限频:同一条日志 const std::string rkey = make_key(logger_name, level, code, msg); - if (!should_emit(rkey)) return; + + uint64_t suppressed_before_emit = 0; + if (!should_emit(rkey, suppressed_before_emit)) return; + + // 如果本次输出前压掉过日志,则在 log 文本后追加统计 + std::string final_msg = msg; + if (suppressed_before_emit > 0) { + std::ostringstream suppressed_oss; + suppressed_oss << msg << " 【中间重复抑制 " + << suppressed_before_emit + << " 条】"; + final_msg = suppressed_oss.str(); + } std::ostringstream oss; oss << "{\"processNo\":\"" << intToString(g_front_seg_index) @@ -348,7 +369,7 @@ protected: << "\",\"logtype\":\"" << safe_logtype << "\",\"frontType\":\"" << get_front_type_from_subdir() << "\",\"code\":" << code - << ",\"log\":\"" << escape_json(msg) << "\"}"; + << ",\"log\":\"" << escape_json(final_msg) << "\"}"; Ckafka_data_t connect_info; connect_info.strTopic = QString::fromStdString(G_LOG_TOPIC); @@ -386,12 +407,20 @@ public: //////////////////////////////////////////////////////////////////20260303添加日志上送控制 - 频率限制实现 private: struct RateState { - uint64_t hit_count; + uint64_t pass_count; // 当前周期内已放行条数 + uint64_t suppressed_count; // 当前被抑制条数 std::chrono::steady_clock::time_point last_emit; + std::chrono::steady_clock::time_point last_seen; std::chrono::steady_clock::time_point last_reset; bool has_emit; - RateState() : hit_count(0), last_emit(), last_reset(), has_emit(false) {} + RateState() + : pass_count(0), + suppressed_count(0), + last_emit(), + last_seen(), + last_reset(), + has_emit(false) {} }; static std::unordered_map s_rate_map; @@ -403,52 +432,94 @@ private: return oss.str(); } - static bool should_emit(const std::string& key) { + static bool should_emit(const std::string& key, uint64_t& suppressed_before_emit) { using namespace std::chrono; + const auto now = steady_clock::now(); + suppressed_before_emit = 0; std::lock_guard lk(s_rate_mutex); RateState& st = s_rate_map[key]; - const int RESET_SEC = 3600; + const int RESET_SEC = G_LOG_RATE_RESET_SEC ; // 1小时重置 + const int LIMIT_SEC = G_LOG_RATE_LIMIT_SEC ; // 进入限流后:1分钟1条 - // 🚀 强制时间窗口重置(关键) - if (st.has_emit) { + // 初始化 / 强制每小时重置 + if (st.last_reset.time_since_epoch().count() == 0) { + st.last_reset = now; + } else { auto since_reset = duration_cast(now - st.last_reset).count(); if (since_reset >= RESET_SEC) { - st.hit_count = 0; - st.has_emit = false; + st = RateState(); st.last_reset = now; } + } + + // 计算当前频率档位(按“本次与上次看到该key的间隔”判断) + // >=60秒/条:全部保留 + // [1秒, 60秒):保留前60条,然后1分钟1条 + // <1秒:保留前10条,然后1分钟1条 + int allow_burst = 0; + + if (st.last_seen.time_since_epoch().count() == 0) { + // 第一次看到,先按“全部保留”处理 + allow_burst = -1; } else { - // 第一次初始化 - st.last_reset = now; + auto gap_ms = duration_cast(now - st.last_seen).count(); + + if (gap_ms >= G_LOG_RATE_KEEP_ALL_MS) { + allow_burst = -1; // 全部保留 + } else if (gap_ms >= G_LOG_RATE_KEEP_BURST_MS) { + allow_burst = G_LOG_RATE_KEEP_BURST_COUNT; // 前60条 + } else { + allow_burst = G_LOG_RATE_KEEP_HIGHFREQ_COUNT; // 前10条 + } } - st.hit_count++; + st.last_seen = now; - // 🚀 前20条:完全放行 - if (st.hit_count <= 20) { + // 档位1:全部保留 + if (allow_burst == -1) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; st.last_emit = now; st.has_emit = true; return true; } - // 🚀 超过20条:1分钟限1条 - const int period_sec = 60; + // 档位2/3:先放前N条 + if (st.pass_count < (uint64_t)allow_burst) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; + st.last_emit = now; + st.has_emit = true; + return true; + } + // 超过前N条后:进入 1分钟1条 if (!st.has_emit) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; st.last_emit = now; st.has_emit = true; return true; } - const auto elapsed = duration_cast(now - st.last_emit).count(); - if (elapsed >= period_sec) { + auto elapsed = duration_cast(now - st.last_emit).count(); + if (elapsed >= LIMIT_SEC) { + suppressed_before_emit = st.suppressed_count; + st.suppressed_count = 0; + st.pass_count++; st.last_emit = now; + st.has_emit = true; return true; } + // 本条被抑制 + st.suppressed_count++; return false; } }; diff --git a/mykafka.ini b/mykafka.ini new file mode 100644 index 0000000..3fbda81 --- /dev/null +++ b/mykafka.ini @@ -0,0 +1,170 @@ +[Kafka] +BrokerList= +EventTopic= +KafkaFlag= +KafkaListSize= + +RTDataTopic=Real_Time_Data_Topic +HisTopic=LN_Topic +PSTTopic=LN_Topic +PLTTopic=LN_Topic +AlmTopic=AlmTopic +SngTopic=SngTopic + +[Oracle] +OtlType= +OtlConnect= +OtlFlag= +OtlConnectLimit= +SqlListSize= + +[Comtrade] +NEWESTFlag= + +[SFTP] +SFtpFlag= + +[SagSource] +UpdateFlag= + +[Unit] +UnitOfTime= + +[Recall] +JournalTime= +recall_lenth= +recall_start= +recall_dailytime= +select_day= + +[screen] +ScreenFlag= +WebHost= +WebPort= +ScreenUrl= + +[AccountUpdate] +Interval= +LastUpdateTime= + +[MultiNode] +Interval= + +[CommunicationLog] +StatusRecordDuration= +AbnormalRecordDuration= + +[Postgres] +Database= +Username= +Password= +Schema= +Dnsname= +TablePrefix= + +[Web] +ClientId= +ClientSecret= +TokenUrl= +DeviceUrl= +GrantType= + +[Flag] +FileFlag=4 +FrontInst=884d132ac3a01225fcacc8c10da07d09 +FrontIP=192.168.1.167 +SendFlag=3 +RecallOnlyFlag= + +[Ledger] +TerminalStatus="[0]" +MonitorStatus="[1,2]" +IcdFlag=0 +IedCount=300 + +[Socket] +SocketEnable=0 +SocketPort=13000 + +[Http] +HttpEnable=0 +HttpIp=0.0.0.0 +HttpPort=12000 +WebDevice=http://192.168.1.68:10202/nodeDevice/nodeDeviceList +WebIcd=http://192.168.1.68:10202/icd/icdPathList +WebIntegrity=http://192.168.1.68:10202/LineIntegrityData/saveOrUpdateData +WebComflag=http://192.168.1.68:10202/dev/updateDevComFlag +WebEvent=http://192.168.1.68:10203/event/addEventDetail +WebFileupload=http://192.168.1.68:10207/file/upload +WebFiledownload=http://192.168.1.68:10207/file/download + +[Oss] +OssEndpoint= +AccessKeyID= +AccessKeySecret= +BucketName= + +[FrontNode] +Node= + +[MySql] +ConStr= + +[InfluxDb] +SelectUrl= +WriteUrl= + +[RocketMq] +producer=Group_producer +Ipport=192.168.1.68:9876 +Topic=TEST_Topic +Tag=Test_Tag +Key=Test_Keys +Queuenum=4 + +Testflag=1 +Testnum=100 +Testtype=1 +TestPort=11000 +TestList= + +consumer=Group_consumer +ConsumerIpport=192.168.1.68:9876 +ConsumerTopicRT=ask_real_data_topic +ConsumerTagRT=Test_Tag +ConsumerKeyRT=Test_Keys +ConsumerAccessKey=rmqroot +ConsumerSecretKey=001@#njcnmq +ConsumerChannel= +ConsumerTopicUD=control_Topic +ConsumerTagUD=Test_Tag +ConsumerKeyUD=Test_Keys +ConsumerTopicRC=recall_Topic +ConsumerTagRC=Test_Tag +ConsumerKeyRC=Test_Keys +ConsumerTopicSET=process_Topic +ConsumerTagSET=Test_Tag +ConsumerKeySET=Test_Keys +ConsumerTopicLOG=ask_log_Topic +ConsumerTagLOG=Test_Tag +ConsumerKeyLOG=Test_Keys +LOGTopic=log_Topic +LOGTag=Test_Tag +LOGKey=Test_Keys +CONNECTTopic=Device_Run_Flag_Topic +CONNECTTag=Test_Tag +CONNECTKey=Test_Keys +Heart_Beat_Topic=Heart_Beat_Topic +Heart_Beat_Tag=Test_Tag +Heart_Beat_Key=Test_Key +Topic_Reply_Topic=Topic_Reply_Topic +Topic_Reply_Tag=Test_Tag +Topic_Reply_Key=Test_Key + +[LogRate] +ResetSec=3600 +LimitSec=60 +KeepAllMs=60000 +KeepBurstMs=1000 +KeepBurstCount=60 +KeepHighFreqCount=10