fix mq config and log control

This commit is contained in:
lnk
2026-04-17 16:35:35 +08:00
parent cf94a99cad
commit 54c97ad103
4 changed files with 412 additions and 22 deletions

View File

@@ -54,6 +54,7 @@ extern std::string G_MQCONSUMER_TOPIC_UD;
extern std::string G_MQCONSUMER_TOPIC_RT;
extern std::string FRONT_INST;
extern bool DEBUGOPEN;
#ifdef __cplusplus
extern "C" {
@@ -417,6 +418,9 @@ public:
// 设置 nameserver 地址
SetProducerNameServerAddress(producer_, nameServer.c_str());
//lnk20260417设置数据上送消息体最大值默认4M调整为1M避免过大消息导致发送失败
SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
// 启动生产者
@@ -457,6 +461,64 @@ public:
// 发送消息
void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
if (DEBUGOPEN) {
std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL")
<< ", tags: " << tags
<< ", keys: " << keys
<< std::endl;
if (strbody) {
// ===== 1⃣ 真实长度 vs strlen =====
std::string body_str(strbody);
std::cout << "[MQ][LEN_CHECK]"
<< " strlen=" << strlen(strbody)
<< ", std::string.size=" << body_str.size()
<< std::endl;
// ===== 2⃣ 检测是否包含 \0 =====
bool has_null = false;
for (size_t i = 0; i < body_str.size(); i++) {
if (body_str[i] == '\0') {
has_null = true;
std::cout << "[MQ][FOUND_NULL] index=" << i << std::endl;
break;
}
}
std::cout << "[MQ][HAS_NULL] " << (has_null ? "YES" : "NO") << std::endl;
// ===== 3⃣ 打印头部(可读)=====
size_t len = strlen(strbody);
size_t n = std::min((size_t)200, len);
std::cout << "[MQ][BODY_HEAD] "
<< std::string(strbody, n)
<< std::endl;
std::cout << "[MQ][BODY_TAIL] "
<< std::string(strbody + (len - n), n)
<< std::endl;
// ===== 4⃣ 十六进制打印前100字节 =====
std::cout << "[MQ][HEX_HEAD] ";
for (size_t i = 0; i < std::min((size_t)100, body_str.size()); i++) {
printf("%02X ", (unsigned char)body_str[i]);
}
printf("\n");
// ===== 5⃣ 十六进制打印尾部100字节 =====
std::cout << "[MQ][HEX_TAIL] ";
size_t start = (body_str.size() > 100) ? body_str.size() - 100 : 0;
for (size_t i = start; i < body_str.size(); i++) {
printf("%02X ", (unsigned char)body_str[i]);
}
printf("\n");
} else {
std::cout << "[MQ][ERROR] strbody is NULL" << std::endl;
}
}
CSendResult result;
CMessage* msg = NULL;
@@ -481,6 +543,47 @@ public:
RoundRobinSelector, // 队列选择器回调函数
&queueNum // 传递给选择器的额外参数(队列数量)
);
/////////////////////////////////替换接口,性能较低但不影响
/*CSendResult result;
memset(&result, 0, sizeof(result));
int sendResult = SendMessageOrderly(
producer_,
msg,
RoundRobinSelector,
&queueNum,
0, // autoRetryTimes
&result
);
std::cout << "[MQ][ORDERLY_RESULT]"
<< " ret=" << sendResult
<< ", sendStatus=" << (int)result.sendStatus
<< ", msgId=" << result.msgId
<< ", offset=" << result.offset
<< ", topic=" << (topic ? topic : "")
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;*/
/////////////////////////////////替换接口,性能较低但不影响
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
/*CSendResult result;
memset(&result, 0, sizeof(result));
int sendResult = SendMessageSync(
producer_,
msg,
&result
);
std::cout << "[MQ][SYNC_RESULT]"
<< " ret=" << sendResult
<< ", sendStatus=" << (int)result.sendStatus
<< ", msgId=" << result.msgId
<< ", offset=" << result.offset
<< ", topic=" << (topic ? topic : "")
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;*/
// 发送消息:临时改成同步发送,绕过 orderly / selector便于定位问题
if (sendResult == 0) { // 假设返回 0 表示成功
std::cout << "[MQ][SEND_OK]"
@@ -490,13 +593,17 @@ public:
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;
} else {
std::cout << "[MQ][SEND_FAIL]"
<< " ret=" << sendResult
<< ", topic=" << (topic ? topic : "")
<< ", tags=" << tags
<< ", keys=" << keys
<< ", body_len=" << (strbody ? strlen(strbody) : 0)
<< std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
std::cout << "[MQ][BODY_HEAD] " << std::string(strbody, std::min((size_t)200, strlen(strbody))) << std::endl;
std::cout << "[MQ][BODY_TAIL] " << std::string(strbody + std::max((size_t)0, strlen(strbody) - std::min((size_t)200, strlen(strbody)))) << std::endl;
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
}
// 销毁消息

View File

@@ -358,6 +358,14 @@ std::string WEB_EVENT = "";
std::string WEB_FILEUPLOAD = "";
std::string WEB_FILEDOWNLOAD = "";
// 日志限流配置
int G_LOG_RATE_RESET_SEC = 3600; // 1小时重置
int G_LOG_RATE_LIMIT_SEC = 60; // 进入限流后60秒1条
int G_LOG_RATE_KEEP_ALL_MS = 60000; // 间隔 >= 60000ms全部保留
int G_LOG_RATE_KEEP_BURST_MS = 1000; // 间隔 >= 1000ms按二级策略
int G_LOG_RATE_KEEP_BURST_COUNT = 60; // 二级保留前60条
int G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10; // 高频保留前10条
//lnk20250115添加台账锁
extern pthread_mutex_t mtx;
@@ -781,6 +789,29 @@ void init_config() {
std::cout << "Read G_TEST_NUM:" << G_TEST_NUM << std::endl;
std::cout << "Read G_TEST_TYPE:" << G_TEST_TYPE << std::endl;
// 日志限流配置
G_LOG_RATE_RESET_SEC = settings.value("LogRate/ResetSec", 3600).toInt();
G_LOG_RATE_LIMIT_SEC = settings.value("LogRate/LimitSec", 60).toInt();
G_LOG_RATE_KEEP_ALL_MS = settings.value("LogRate/KeepAllMs", 60000).toInt();
G_LOG_RATE_KEEP_BURST_MS = settings.value("LogRate/KeepBurstMs", 1000).toInt();
G_LOG_RATE_KEEP_BURST_COUNT = settings.value("LogRate/KeepBurstCount", 60).toInt();
G_LOG_RATE_KEEP_HIGHFREQ_COUNT = settings.value("LogRate/KeepHighFreqCount", 10).toInt();
std::cout << "Read G_LOG_RATE_RESET_SEC:" << G_LOG_RATE_RESET_SEC << std::endl;
std::cout << "Read G_LOG_RATE_LIMIT_SEC:" << G_LOG_RATE_LIMIT_SEC << std::endl;
std::cout << "Read G_LOG_RATE_KEEP_ALL_MS:" << G_LOG_RATE_KEEP_ALL_MS << std::endl;
std::cout << "Read G_LOG_RATE_KEEP_BURST_MS:" << G_LOG_RATE_KEEP_BURST_MS << std::endl;
std::cout << "Read G_LOG_RATE_KEEP_BURST_COUNT:" << G_LOG_RATE_KEEP_BURST_COUNT << std::endl;
std::cout << "Read G_LOG_RATE_KEEP_HIGHFREQ_COUNT:" << G_LOG_RATE_KEEP_HIGHFREQ_COUNT << std::endl;
if (G_LOG_RATE_RESET_SEC <= 0) G_LOG_RATE_RESET_SEC = 3600;
if (G_LOG_RATE_LIMIT_SEC <= 0) G_LOG_RATE_LIMIT_SEC = 60;
if (G_LOG_RATE_KEEP_ALL_MS <= 0) G_LOG_RATE_KEEP_ALL_MS = 60000;
if (G_LOG_RATE_KEEP_BURST_MS <= 0) G_LOG_RATE_KEEP_BURST_MS = 1000;
if (G_LOG_RATE_KEEP_BURST_COUNT < 0) G_LOG_RATE_KEEP_BURST_COUNT = 60;
if (G_LOG_RATE_KEEP_HIGHFREQ_COUNT < 0) G_LOG_RATE_KEEP_HIGHFREQ_COUNT = 10;
//20241212lnk添加多前置
if (g_front_seg_index != 0 && g_front_seg_num != 0) {
MULTIPLE_NODE_FLAG = 1;
@@ -6542,6 +6573,9 @@ void rocketmq_test_300(int mpnum,int front_index,int type) {
// 循环发送 300 条消息
if(type == 0){
std::cout << " use ledger send msg " << std::endl;
pthread_mutex_lock(&mtx);
for (int i = 0; (total_messages > 0 && g_node_id == 100) && i < g_node->n_clients; ++i) {//台账模拟不限制进程号
ied = (ied_t*)g_node->clients[i];
@@ -6611,6 +6645,13 @@ void rocketmq_test_300(int mpnum,int front_index,int type) {
}
}
}
std::cout << "Finished sending " << g_node->n_clients << " messages." << std::endl;
pthread_mutex_unlock(&mtx);
}
else{
std::cout << " use monitor + number send msg " << std::endl;
@@ -6663,9 +6704,10 @@ void rocketmq_test_300(int mpnum,int front_index,int type) {
apr_sleep(apr_time_from_msec(60000/total_messages)); // 添加毫秒延时
}*/
}
}
std::cout << "Finished sending " << total_messages << " messages." << std::endl;
std::cout << "Finished sending " << total_messages << " messages." << std::endl;
}
}
}

View File

@@ -68,6 +68,15 @@ extern std::string intToString(int number);
//日志主题
extern std::string G_LOG_TOPIC;
// 日志限流配置
extern int G_LOG_RATE_RESET_SEC;
extern int G_LOG_RATE_LIMIT_SEC;
extern int G_LOG_RATE_KEEP_ALL_MS;
extern int G_LOG_RATE_KEEP_BURST_MS;
extern int G_LOG_RATE_KEEP_BURST_COUNT;
extern int G_LOG_RATE_KEEP_HIGHFREQ_COUNT;
/////////////////////////////////////////////////////////
//log4命名空间
using namespace log4cplus;
@@ -337,7 +346,19 @@ protected:
// ③ 限频:同一条日志
const std::string rkey = make_key(logger_name, level, code, msg);
if (!should_emit(rkey)) return;
uint64_t suppressed_before_emit = 0;
if (!should_emit(rkey, suppressed_before_emit)) return;
// 如果本次输出前压掉过日志,则在 log 文本后追加统计
std::string final_msg = msg;
if (suppressed_before_emit > 0) {
std::ostringstream suppressed_oss;
suppressed_oss << msg << " 【中间重复抑制 "
<< suppressed_before_emit
<< " 条】";
final_msg = suppressed_oss.str();
}
std::ostringstream oss;
oss << "{\"processNo\":\"" << intToString(g_front_seg_index)
@@ -348,7 +369,7 @@ protected:
<< "\",\"logtype\":\"" << safe_logtype
<< "\",\"frontType\":\"" << get_front_type_from_subdir()
<< "\",\"code\":" << code
<< ",\"log\":\"" << escape_json(msg) << "\"}";
<< ",\"log\":\"" << escape_json(final_msg) << "\"}";
Ckafka_data_t connect_info;
connect_info.strTopic = QString::fromStdString(G_LOG_TOPIC);
@@ -386,12 +407,20 @@ public:
//////////////////////////////////////////////////////////////////20260303添加日志上送控制 - 频率限制实现
private:
struct RateState {
uint64_t hit_count;
uint64_t pass_count; // 当前周期内已放行条数
uint64_t suppressed_count; // 当前被抑制条数
std::chrono::steady_clock::time_point last_emit;
std::chrono::steady_clock::time_point last_seen;
std::chrono::steady_clock::time_point last_reset;
bool has_emit;
RateState() : hit_count(0), last_emit(), last_reset(), has_emit(false) {}
RateState()
: pass_count(0),
suppressed_count(0),
last_emit(),
last_seen(),
last_reset(),
has_emit(false) {}
};
static std::unordered_map<std::string, RateState> s_rate_map;
@@ -403,52 +432,94 @@ private:
return oss.str();
}
static bool should_emit(const std::string& key) {
static bool should_emit(const std::string& key, uint64_t& suppressed_before_emit) {
using namespace std::chrono;
const auto now = steady_clock::now();
suppressed_before_emit = 0;
std::lock_guard<std::mutex> lk(s_rate_mutex);
RateState& st = s_rate_map[key];
const int RESET_SEC = 3600;
const int RESET_SEC = G_LOG_RATE_RESET_SEC ; // 1小时重置
const int LIMIT_SEC = G_LOG_RATE_LIMIT_SEC ; // 进入限流后1分钟1条
// 🚀 强制时间窗口重置(关键)
if (st.has_emit) {
// 初始化 / 强制每小时重置
if (st.last_reset.time_since_epoch().count() == 0) {
st.last_reset = now;
} else {
auto since_reset = duration_cast<seconds>(now - st.last_reset).count();
if (since_reset >= RESET_SEC) {
st.hit_count = 0;
st.has_emit = false;
st = RateState();
st.last_reset = now;
}
}
// 计算当前频率档位按“本次与上次看到该key的间隔”判断
// >=60秒/条:全部保留
// [1秒, 60秒)保留前60条然后1分钟1条
// <1秒保留前10条然后1分钟1条
int allow_burst = 0;
if (st.last_seen.time_since_epoch().count() == 0) {
// 第一次看到,先按“全部保留”处理
allow_burst = -1;
} else {
// 第一次初始化
st.last_reset = now;
auto gap_ms = duration_cast<milliseconds>(now - st.last_seen).count();
if (gap_ms >= G_LOG_RATE_KEEP_ALL_MS) {
allow_burst = -1; // 全部保留
} else if (gap_ms >= G_LOG_RATE_KEEP_BURST_MS) {
allow_burst = G_LOG_RATE_KEEP_BURST_COUNT; // 前60条
} else {
allow_burst = G_LOG_RATE_KEEP_HIGHFREQ_COUNT; // 前10条
}
}
st.hit_count++;
st.last_seen = now;
// 🚀 前20条完全放行
if (st.hit_count <= 20) {
// 档位1全部保留
if (allow_burst == -1) {
suppressed_before_emit = st.suppressed_count;
st.suppressed_count = 0;
st.pass_count++;
st.last_emit = now;
st.has_emit = true;
return true;
}
// 🚀 超过20条1分钟限1
const int period_sec = 60;
// 档位2/3先放前N
if (st.pass_count < (uint64_t)allow_burst) {
suppressed_before_emit = st.suppressed_count;
st.suppressed_count = 0;
st.pass_count++;
st.last_emit = now;
st.has_emit = true;
return true;
}
// 超过前N条后进入 1分钟1条
if (!st.has_emit) {
suppressed_before_emit = st.suppressed_count;
st.suppressed_count = 0;
st.pass_count++;
st.last_emit = now;
st.has_emit = true;
return true;
}
const auto elapsed = duration_cast<seconds>(now - st.last_emit).count();
if (elapsed >= period_sec) {
auto elapsed = duration_cast<seconds>(now - st.last_emit).count();
if (elapsed >= LIMIT_SEC) {
suppressed_before_emit = st.suppressed_count;
st.suppressed_count = 0;
st.pass_count++;
st.last_emit = now;
st.has_emit = true;
return true;
}
// 本条被抑制
st.suppressed_count++;
return false;
}
};

170
mykafka.ini Normal file
View File

@@ -0,0 +1,170 @@
[Kafka]
BrokerList=
EventTopic=
KafkaFlag=
KafkaListSize=
RTDataTopic=Real_Time_Data_Topic
HisTopic=LN_Topic
PSTTopic=LN_Topic
PLTTopic=LN_Topic
AlmTopic=AlmTopic
SngTopic=SngTopic
[Oracle]
OtlType=
OtlConnect=
OtlFlag=
OtlConnectLimit=
SqlListSize=
[Comtrade]
NEWESTFlag=
[SFTP]
SFtpFlag=
[SagSource]
UpdateFlag=
[Unit]
UnitOfTime=
[Recall]
JournalTime=
recall_lenth=
recall_start=
recall_dailytime=
select_day=
[screen]
ScreenFlag=
WebHost=
WebPort=
ScreenUrl=
[AccountUpdate]
Interval=
LastUpdateTime=
[MultiNode]
Interval=
[CommunicationLog]
StatusRecordDuration=
AbnormalRecordDuration=
[Postgres]
Database=
Username=
Password=
Schema=
Dnsname=
TablePrefix=
[Web]
ClientId=
ClientSecret=
TokenUrl=
DeviceUrl=
GrantType=
[Flag]
FileFlag=4
FrontInst=884d132ac3a01225fcacc8c10da07d09
FrontIP=192.168.1.167
SendFlag=3
RecallOnlyFlag=
[Ledger]
TerminalStatus="[0]"
MonitorStatus="[1,2]"
IcdFlag=0
IedCount=300
[Socket]
SocketEnable=0
SocketPort=13000
[Http]
HttpEnable=0
HttpIp=0.0.0.0
HttpPort=12000
WebDevice=http://192.168.1.68:10202/nodeDevice/nodeDeviceList
WebIcd=http://192.168.1.68:10202/icd/icdPathList
WebIntegrity=http://192.168.1.68:10202/LineIntegrityData/saveOrUpdateData
WebComflag=http://192.168.1.68:10202/dev/updateDevComFlag
WebEvent=http://192.168.1.68:10203/event/addEventDetail
WebFileupload=http://192.168.1.68:10207/file/upload
WebFiledownload=http://192.168.1.68:10207/file/download
[Oss]
OssEndpoint=
AccessKeyID=
AccessKeySecret=
BucketName=
[FrontNode]
Node=
[MySql]
ConStr=
[InfluxDb]
SelectUrl=
WriteUrl=
[RocketMq]
producer=Group_producer
Ipport=192.168.1.68:9876
Topic=TEST_Topic
Tag=Test_Tag
Key=Test_Keys
Queuenum=4
Testflag=1
Testnum=100
Testtype=1
TestPort=11000
TestList=
consumer=Group_consumer
ConsumerIpport=192.168.1.68:9876
ConsumerTopicRT=ask_real_data_topic
ConsumerTagRT=Test_Tag
ConsumerKeyRT=Test_Keys
ConsumerAccessKey=rmqroot
ConsumerSecretKey=001@#njcnmq
ConsumerChannel=
ConsumerTopicUD=control_Topic
ConsumerTagUD=Test_Tag
ConsumerKeyUD=Test_Keys
ConsumerTopicRC=recall_Topic
ConsumerTagRC=Test_Tag
ConsumerKeyRC=Test_Keys
ConsumerTopicSET=process_Topic
ConsumerTagSET=Test_Tag
ConsumerKeySET=Test_Keys
ConsumerTopicLOG=ask_log_Topic
ConsumerTagLOG=Test_Tag
ConsumerKeyLOG=Test_Keys
LOGTopic=log_Topic
LOGTag=Test_Tag
LOGKey=Test_Keys
CONNECTTopic=Device_Run_Flag_Topic
CONNECTTag=Test_Tag
CONNECTKey=Test_Keys
Heart_Beat_Topic=Heart_Beat_Topic
Heart_Beat_Tag=Test_Tag
Heart_Beat_Key=Test_Key
Topic_Reply_Topic=Topic_Reply_Topic
Topic_Reply_Tag=Test_Tag
Topic_Reply_Key=Test_Key
[LogRate]
ResetSec=3600
LimitSec=60
KeepAllMs=60000
KeepBurstMs=1000
KeepBurstCount=60
KeepHighFreqCount=10