modify recall

This commit is contained in:
lnk
2025-10-15 16:29:54 +08:00
parent 24e1bf0125
commit c55f2ab1af
3 changed files with 352 additions and 248 deletions

View File

@@ -88,6 +88,20 @@ bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int l
std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid);
bool writeToFile(const std::string& filePath, const std::string& xmlContent);
//////////////////////////////////////////////////////////////////////////////////////////////////////消费起始控制
static const int64_t G_APP_START_MS = []() -> int64_t {
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}();
static const int64_t G_START_SKEW_MS = 1000; // 容错 1s可按需调整
static inline bool should_process_after_start(const rocketmq::MQMessageExt& msg) {
const int64_t born_ts = static_cast<int64_t>(msg.getBornTimestamp());
return born_ts >= (G_APP_START_MS - G_START_SKEW_MS);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////
namespace rocketmq {
@@ -423,34 +437,49 @@ bool parseJsonMessageSET(const std::string& json_str) {
return false;
}
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
std::cerr << "'messageBody' is missing or is not a string" << std::endl;
return false;
}
std::string messageBodyStr = root["messageBody"];
if (messageBodyStr.empty()) {
std::cerr << "'messageBody' is empty" << std::endl;
return false;
}
// [MOD] 允许 messageBody 既可为字符串也可为对象;保持原有错误打印
// ----- MOD BEGIN: messageBody 兼容 string/object -----
json messageBody;
try {
messageBody = json::parse(messageBodyStr);
} catch (const std::exception& e) {
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
if (!root.contains("messageBody")) {
std::cerr << "missing 'messageBody'" << std::endl;
return false;
}
if (root["messageBody"].is_string()) {
std::string messageBodyStr = root["messageBody"].get<std::string>();
if (messageBodyStr.empty()) {
std::cerr << "'messageBody' is empty" << std::endl;
return false;
}
try {
messageBody = json::parse(messageBodyStr);
} catch (const std::exception& e) {
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
return false;
}
} else if (root["messageBody"].is_object()) {
messageBody = root["messageBody"];
} else {
std::cerr << "'messageBody' is neither string nor object" << std::endl;
return false;
}
// ----- MOD END: messageBody 兼容 string/object -----
// 获取字段
if (!messageBody.contains("guid") || !messageBody.contains("code") ||
!messageBody.contains("processNo") || !messageBody.contains("fun") ||
!messageBody.contains("frontType")) {
// [MOD] 基础必填字段仅校验 guid/code/processNo/funfrontType、processNum 改为按功能分支再校验
// ----- MOD BEGIN: 基础字段按需校验 -----
if (!messageBody.contains("guid") ||
!messageBody.contains("code") ||
!messageBody.contains("processNo") ||
!messageBody.contains("fun")) {
std::cout << "Missing one or more required fields in messageBody." << std::endl;
return false;
}
// ----- MOD END: 基础字段按需校验 -----
std::string guid, code_str, fun, frontType;
std::string guid, code_str, fun;
// [MOD] frontType 改为可选,给默认值 "all"
// ----- MOD BEGIN: frontType 可选 -----
std::string frontType = "all";
// ----- MOD END: frontType 可选 -----
int index_value = 0;
try {
@@ -458,13 +487,20 @@ bool parseJsonMessageSET(const std::string& json_str) {
code_str = messageBody["code"].get<std::string>();
index_value = messageBody["processNo"].get<int>();
fun = messageBody["fun"].get<std::string>();
frontType = messageBody["frontType"].get<std::string>();
// [MOD] 仅当存在 frontType 且为 string 时再读取,保持兼容
// ----- MOD BEGIN: frontType 存在才解析 -----
if (messageBody.contains("frontType") && messageBody["frontType"].is_string()) {
frontType = messageBody["frontType"].get<std::string>();
}
// ----- MOD END: frontType 存在才解析 -----
} catch (const std::exception& e) {
std::cerr << "Field parsing error: " << e.what() << std::endl;
return false;
}
// 判断进程号是否匹配
// 判断进程号是否匹配(保留原逻辑)
if (index_value != g_front_seg_index && g_front_seg_index != 0) {
std::cout << "msg index: " << index_value << " doesn't match self index: " << g_front_seg_index << std::endl;
return true;
@@ -472,52 +508,67 @@ bool parseJsonMessageSET(const std::string& json_str) {
std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl;
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
if (code_str == "set_process") {
if (!messageBody.contains("processNum")) {
std::cout << "Missing 'processNum' in JSON." << std::endl;
return false;
}
int processNum = 0;
try {
processNum = messageBody["processNum"].get<int>();
} catch (...) {
std::cout << "'processNum' parsing failed." << std::endl;
return false;
}
// [MOD] 按功能分支分别校验参数:
// reset/add 需要 processNum且可选 frontType默认 all
// delete 不需要 frontType/processNum
// ----- MOD BEGIN: 分功能按需校验与执行 -----
if (fun == "reset" || fun == "add") {
// 校验参数并执行
if ((fun == "reset" || fun == "add") &&
(processNum >= 1 && processNum < 10) &&
(frontType == "cloudfront" || frontType == "all")) {
//if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
execute_bash(fun, processNum, frontType);
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str());
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!");
std::cout << "this msg should only execute once" << std::endl;
} else {
std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl;
if (!messageBody.contains("processNum")) {
std::cout << "Missing 'processNum' in JSON." << std::endl;
return false;
}
}
else if (fun == "delete") {
int processNum = 0;
try {
processNum = messageBody["processNum"].get<int>();
} catch (...) {
std::cout << "'processNum' parsing failed." << std::endl;
return false;
}
// 校验参数并执行保留你原校验条件frontType 允许默认 all
if ((processNum >= 1 && processNum < 10) &&
(frontType == "cloudfront" || frontType == "all")) {
// if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
execute_bash(fun, processNum, frontType);
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",
g_front_seg_index, fun.c_str());
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!");
std::cout << "this msg should only execute once" << std::endl;
} else {
std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl;
}
} else {
std::cout << "param is not executable" << std::endl;
}
} else if (fun == "delete") {
// delete 分支:不要求 frontType/processNum
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到删除进程指令,这个进程将会重启 ");
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str());
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启",
g_front_seg_index, fun.c_str());
std::this_thread::sleep_for(std::chrono::seconds(10));
std::this_thread::sleep_for(std::chrono::seconds(3));
::_exit(-1039); // 进程退出
}
else {
} else {
std::cout << "param is not executable" << std::endl;
}
// ----- MOD END: 分功能按需校验与执行 -----
} else {
std::cout << "set process code str error" << std::endl;
}
@@ -774,7 +825,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
//删除旧的
ClientManager::instance().remove_device(json_data.terminal_id);
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
@@ -927,6 +978,20 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
@@ -992,6 +1057,20 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
@@ -1023,6 +1102,20 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg)
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
@@ -1041,7 +1134,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg)
// 调用业务处理逻辑
if (!parseJsonMessageSET(body)) {
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s - tag:%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index, G_MQCONSUMER_TOPIC_SET.c_str(), FRONT_INST.c_str());
}
return rocketmq::CONSUME_SUCCESS;
@@ -1053,6 +1146,20 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg)
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
@@ -1084,6 +1191,20 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
// 调试输出
std::cout << "myMessageCallbackrecall" << std::endl;