From c55f2ab1afd277aacde5a34547b1c6bc6365b755 Mon Sep 17 00:00:00 2001 From: lnk Date: Wed, 15 Oct 2025 16:29:54 +0800 Subject: [PATCH] modify recall --- LFtid1056/cloudfront/code/cfg_parser.cpp | 359 +++++++++++------------ LFtid1056/cloudfront/code/interface.h | 4 +- LFtid1056/cloudfront/code/rocketmq.cpp | 237 +++++++++++---- 3 files changed, 352 insertions(+), 248 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 4d43efa..48accf5 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -205,7 +205,10 @@ std::vector TESTARRAY; //解析的列表数组 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////其他文件定义的函数引用声明 - +bool enqueue_direct_download(const std::string& dev_id, + const std::string& monitor_id, + const std::string& filename, + const std::vector& dir_candidates); /////////////////////////////////////////////////////////////////////////////////////////////////////////////////当前文件函数声明 @@ -1060,6 +1063,49 @@ void create_recall_xml() } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////补招部分 // 工具函数:将时间字符串转为 time_t(秒级) +// ▲新增:从 monitorId 提取结尾数字(不含前导非数字部分),失败返回空串 +static std::string extract_monitor_digits(const std::string& monitorId) { + // 例: "00B78D0171091" -> "171091"(按你的“对应的数字”的规则可定制) + // 这里实现:取 monitorId 中最后一段连续数字 + std::string digits; + for (int i = static_cast(monitorId.size()) - 1; i >= 0; --i) { + if (std::isdigit(static_cast(monitorId[i]))) { + digits.push_back(monitorId[i]); + } else if (!digits.empty()) { + break; + } + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +// ▲新增:把 "YYYY-MM-DD HH:MM:SS[.ffffff]" -> "YYYYMMDDHHMMSS"(忽略小数部分) +static std::string compact_ts_for_filename(const std::string& ts) { + // 允许 "2025-10-10 14:38:07.000000" + // 输出 "20251010143807" + std::string ymdhms; + ymdhms.reserve(14); + for (size_t i = 0; i < ts.size(); ++i) { + char c = ts[i]; + if (std::isdigit(static_cast(c))) { + ymdhms.push_back(c); + if (ymdhms.size() == 14) break; + } + } + return (ymdhms.size() == 14) ? ymdhms : std::string(); +} + +// ▲新增:按“数字_时间后缀.后缀”拼直下文件名(返回{*.cfg, *.dat}两种) +static std::vector build_direct_filenames(const std::string& monitorDigits, + const std::string& ts_compact) +{ + std::vector out; + if (monitorDigits.empty() || ts_compact.empty()) return out; + out.push_back(monitorDigits + "_" + ts_compact + ".cfg"); + out.push_back(monitorDigits + "_" + ts_compact + ".dat"); + return out; +} + static long long parse_time_to_epoch(const std::string& time_str) { std::tm tm = {}; std::istringstream ss(time_str); @@ -1138,9 +1184,9 @@ int recall_json_handle_from_mq(const std::string& body) } // 解析 messageBody 内层 JSON - nlohmann::json messageBody; + nlohmann::json mb; try { - messageBody = nlohmann::json::parse(messageBodyStr); + mb = nlohmann::json::parse(messageBodyStr); } catch (const std::exception& e) { std::cerr << "Failed to parse 'messageBody' JSON: " << e.what() << std::endl; DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,messageBody的json结构不正确", @@ -1148,210 +1194,148 @@ int recall_json_handle_from_mq(const std::string& body) return 10004; } - // 提取 guid 并立即回执 - if (!messageBody.contains("guid") || !messageBody["guid"].is_string()) { - std::cerr << "'guid' is missing or is not a string" << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,没有guid字段", - g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); - return 10004; - } - std::string guid = messageBody["guid"].get(); - //send_reply_to_queue(guid, static_cast(ResponseCode::OK), "收到补招指令"); + if (mb.is_array()) { + // ====== 新格式(数组):支持 dataType=0/1 的区间补招 & dataType=2 的直下文件 ====== + for (const auto& rec : mb) { + if (!rec.is_object()) continue; - // 提取 data 数组 - if (!messageBody.contains("data") || !messageBody["data"].is_array()) { - std::cerr << "'data' is missing or is not an array" << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,没有data字段", - g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); - return 10004; - } - // 仅用于保留你原先的调试输出 - std::string data_dump; - try { data_dump = messageBody["data"].dump(); } catch (...) { data_dump.clear(); } - std::cout << "parseJsonMessageRC: " << data_dump << std::endl; + // 必要字段 + std::string guid = rec.value("guid", ""); + std::string terminalId = rec.value("terminalId", ""); + if (terminalId.empty()) continue; - // 不指定稳态/暂态则全部补招 - int stat = 0; - int voltage = 0; - - // 2. 遍历每个补招项(这里直接用已解析的 messageBody["data"]) - for (auto& item : messageBody["data"]) { - // 获取必需字段 - // ★修改:强制要求 terminalId; - if (!item.contains("terminalId") || - !item.contains("monitor") || - !item.contains("timeInterval") || - !item.contains("dataType")) - { - std::cout << "json内容解析错误" << std::endl; - return 10000; - } - - // ★新增:读取 terminalId(必填) - std::string terminalId = item["terminalId"].get(); - if (terminalId.empty()) { - std::cout << "terminalId为空" << std::endl; - continue; - } - - // 2.1 解析 dataType(仅保留稳态/暂态) - std::string datatype = item["dataType"].get(); - if (!datatype.empty()) { - if (datatype == "0" || datatype == "稳态" || datatype == "steady" || datatype == "STEADY") { - stat = 1; voltage = 0; // 稳态 - } else if (datatype == "1" || datatype == "暂态" || datatype == "voltage" || datatype == "VOLTAGE") { - stat = 0; voltage = 1; // 暂态 - } else { - stat = voltage = 1; // 其他情况按全补 - } - } else { - stat = voltage = 1; // 全补 - } - - // ★新增:定位并校验该 terminal 是否归属当前进程 - std::lock_guard lock(ledgermtx); - - const terminal_dev* targetDev = NULL; - for (std::vector::const_iterator it = terminal_devlist.begin(); - it != terminal_devlist.end(); ++it) - { - if (it->terminal_id == terminalId) { - targetDev = &(*it); - break; - } - } - if (!targetDev) { - std::cout << "terminalId未匹配当前进程内装置: " << terminalId << std::endl; - continue; - } - - // 添加判断装置在线,不注册guid,异步补招 - if (ClientManager::instance().get_dev_status(targetDev->terminal_id) != 1) { - std::cout << "terminalId对应装置不在线: " << targetDev->terminal_id << std::endl; - - // 响应 web - std::string msg = std::string("装置:") + targetDev->terminal_name + " 不在线,无法补招"; - send_reply_to_kafka_recall("12345", "2", static_cast(ResponseCode::INTERNAL_ERROR), msg, targetDev->terminal_id, "", "", ""); - continue;//处理下一个装置的补招记录 - } - - // ★新增:按新结构解析 monitor 层级 - nlohmann::json& monitors = item["monitor"]; - if (!monitors.is_array() || monitors.empty()) { - std::cout << "monitor数组为空或非数组类型" << std::endl; - continue; - } - - for (auto& mobj : monitors) { - if (!mobj.contains("monitorId") || !mobj.contains("timeInterval")) { - std::cout << "monitor项缺少 monitorId 或 timeInterval" << std::endl; - continue; + // ▲dataType 可能是字符串"0"/"1"或数字2 + int dt = -1; + if (rec["dataType"].is_number_integer()) { + dt = rec["dataType"].get(); + } else if (rec["dataType"].is_string()) { + std::string s = rec["dataType"].get(); + if (s == "0") dt = 0; else if (s == "1") dt = 1; } - std::string monitorId = mobj["monitorId"].get(); - if (monitorId.empty()) continue; - - // 不只是判断存在,还要拿到指针 lm 以便后续 push_back - ledger_monitor* lm = NULL; - // 注意:这里需要非常量指针,取 const_cast 后遍历 - terminal_dev* dev_nc = const_cast(targetDev); - for (std::vector::iterator itLm = dev_nc->line.begin(); - itLm != dev_nc->line.end(); ++itLm) - { - if (!itLm->monitor_id.empty() && itLm->monitor_id == monitorId) { - lm = &(*itLm); - break; + // 统一 monitorId 为数组形式 + std::vector monitors; + if (rec.contains("monitorId")) { + if (rec["monitorId"].is_array()) { + for (auto& m : rec["monitorId"]) if (m.is_string()) monitors.push_back(m.get()); + } else if (rec["monitorId"].is_string()) { + monitors.push_back(rec["monitorId"].get()); } } - if (!lm) { - std::cout << "monitorId未在terminal内找到: " << monitorId - << " @ " << terminalId << std::endl; - continue; - } + if (monitors.empty()) continue; - nlohmann::json& tiArr = mobj["timeInterval"]; - if (!tiArr.is_array() || tiArr.empty()) { - std::cout << "timeInterval为空或非数组类型: monitorId=" << monitorId << std::endl; - continue; - } - - // 这里拆分时间段并 push 到 lm->recall_list / lm->recall_list_static - for (auto& timeItem : tiArr) { - std::string ti = timeItem.get(); - std::string::size_type pos = ti.find('~'); - if (pos == std::string::npos) { - std::cout << "timeInterval格式错误: " << ti << std::endl; + // ▲沿用:校验终端归属 + 在线性 + { + std::lock_guard lock(ledgermtx); + const terminal_dev* targetDev = NULL; + for (std::vector::const_iterator it = terminal_devlist.begin(); it != terminal_devlist.end(); ++it) { + if (it->terminal_id == terminalId) { targetDev = &(*it); break; } + } + if (!targetDev) { std::cout << "terminalId未匹配当前进程内装置: " << terminalId << std::endl; continue; } + if (ClientManager::instance().get_dev_status(targetDev->terminal_id) != 1) { + std::cout << "terminalId对应装置不在线: " << targetDev->terminal_id << std::endl; + std::string msg = std::string("装置:") + targetDev->terminal_name + " 不在线,无法补招"; + send_reply_to_kafka_recall("12345", "2", static_cast(ResponseCode::INTERNAL_ERROR), msg, targetDev->terminal_id, "", "", ""); continue; } - std::string start = ti.substr(0, pos); - std::string end = ti.substr(pos + 1); + } - // 仅对 recall_list(事件)进行 1 小时拆分;recall_list_static(稳态)不拆分 - { - // 公共字段(整体区间,不拆分)用于 recall_list_static - RecallFile rm_all; // ★类型正确:稳态列表的元素 - rm_all.recall_status = 0; // 初始状态:未补招 - rm_all.StartTime = start; // 直接使用字符串 - rm_all.EndTime = end; - rm_all.STEADY = std::to_string(stat); - rm_all.VOLTAGE = std::to_string(voltage); + if (dt == 2) { + // ▲直下文件:timeList -> fun1/fun2 -> enqueue_direct_download + if (!rec.contains("timeList") || !rec["timeList"].is_array()) continue; - // 仅当需要事件补招(voltage==1)时,才进行 1 小时拆分并压入 recall_list - if (voltage == 1) { - // 拆分时间段为 1 小时一段,并存入 recall_list - std::vector recallinfo_list_hour; - Get_Recall_Time_Char(start, end, recallinfo_list_hour); + for (const auto& monId : monitors) { + // fun1:提取 monitor 数字 + std::string digits = extract_monitor_digits(monId); + if (digits.empty()) { std::cout << "monitorId数字提取失败: " << monId << std::endl; continue; } - for (std::size_t i = 0; i < recallinfo_list_hour.size(); ++i) { - const RecallInfo& info = recallinfo_list_hour[i]; + for (const auto& t : rec["timeList"]) { + if (!t.is_string()) continue; + std::string ts_compact = compact_ts_for_filename(t.get()); + if (ts_compact.empty()) { std::cout << "时间解析失败: " << t << std::endl; continue; } - RecallMonitor rm; // ★类型正确:事件列表的元素 - rm.recall_status = 0; // 初始状态:未补招 - rm.StartTime = epoch_to_datetime_str(info.starttime); - rm.EndTime = epoch_to_datetime_str(info.endtime); - rm.STEADY = std::to_string(stat); - rm.VOLTAGE = std::to_string(voltage); - - lm->recall_list.push_back(rm); - - // 事件补招列表(recall_list)调试打印 - std::cout << "[recall_json_handle] terminal=" << terminalId - << " monitor=" << monitorId - << " [recall_list] start=" << lm->recall_list.back().StartTime - << " end=" << lm->recall_list.back().EndTime - << " steady="<< lm->recall_list.back().STEADY - << " voltage="<< lm->recall_list.back().VOLTAGE - << std::endl; + // fun2:生成 *.cfg/*.dat 两个文件名 + std::vector fns = build_direct_filenames(digits, ts_compact); + // 加入候选目录(使用 RecallFile 缺省的四个;如需定制可在此传自定义列表) + static const std::vector DIRS { + "/cf/COMTRADE", "/bd0/COMTRADE", "/sd0/COMTRADE", "/sd0:1/COMTRADE" + }; + for (const auto& fn : fns) { + bool ok = enqueue_direct_download(terminalId, monId, fn, DIRS); + std::cout << "[direct] enqueue " << (ok ? "ok " : "fail ") + << "dev=" << terminalId << " mon=" << monId + << " file=" << fn << std::endl; } } + } + } else if (dt == 0 || dt == 1) { + // ▲保持老逻辑(与“对象+data”一致):timeInterval 数组 + if (!rec.contains("timeInterval") || !rec["timeInterval"].is_array()) continue; - // 仅当需要稳态补招(stat==1)时,不拆分,直接压入 recall_list_static - if (stat == 1) { - lm->recall_list_static.push_back(rm_all); // 不拆分,整体区间 + // 解析 dataType-> stat/voltage + int stat = (dt == 0) ? 1 : 0; + int voltage = (dt == 1) ? 1 : 0; - // 稳态补招列表(recall_list_static)调试打印 - std::cout << "[recall_json_handle] terminal=" << terminalId - << " monitor=" << monitorId - << " [recall_list_static] start=" << lm->recall_list_static.back().StartTime - << " end=" << lm->recall_list_static.back().EndTime - << " steady="<< lm->recall_list_static.back().STEADY - << " voltage="<< lm->recall_list_static.back().VOLTAGE - << std::endl; + // 把每个 monitor 的区间写入 recall_list / recall_list_static + std::lock_guard lock(ledgermtx); + // 找终端 + terminal_dev* dev_nc = NULL; + for (auto& d : terminal_devlist) if (d.terminal_id == terminalId) { dev_nc = &d; break; } + if (!dev_nc) continue; + + for (const auto& monId : monitors) { + // 找监测点 + ledger_monitor* lm = NULL; + for (auto itLm = dev_nc->line.begin(); itLm != dev_nc->line.end(); ++itLm) { + if (!itLm->monitor_id.empty() && itLm->monitor_id == monId) { lm = &(*itLm); break; } } + if (!lm) { std::cout << "monitorId未在terminal内找到: " << monId << " @ " << terminalId << std::endl; continue; } - // 非法输入保护(保留你原来的保护与返回码) - if (stat == 0 && voltage == 0) { - std::cout << "[recall_json_handle] skip: stat=0 && voltage=0, monitor=" << monitorId - << " terminal=" << terminalId - << " start=" << rm_all.StartTime - << " end=" << rm_all.EndTime - << std::endl; - return 10003; + for (const auto& ti : rec["timeInterval"]) { + if (!ti.is_string()) continue; + std::string s = ti.get(); + std::string::size_type pos = s.find('~'); + if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << s << std::endl; continue; } + std::string start = s.substr(0, pos); + std::string end = s.substr(pos + 1); + + RecallFile rm_all; + rm_all.recall_status = 0; + rm_all.StartTime = start; + rm_all.EndTime = end; + rm_all.STEADY = std::to_string(stat); + rm_all.VOLTAGE = std::to_string(voltage); + + if (voltage == 1) { + std::vector recallinfo_list_hour; + Get_Recall_Time_Char(start, end, recallinfo_list_hour); + for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) { + const RecallInfo& info = recallinfo_list_hour[i]; + RecallMonitor rm; + rm.recall_status = 0; + rm.StartTime = epoch_to_datetime_str(info.starttime); + rm.EndTime = epoch_to_datetime_str(info.endtime); + rm.STEADY = std::to_string(stat); + rm.VOLTAGE = std::to_string(voltage); + lm->recall_list.push_back(rm); + } + } + if (stat == 1) { + lm->recall_list_static.push_back(rm_all); + } + if (stat == 0 && voltage == 0) return 10003; } } + } else { + // 未知 dataType,忽略 + continue; } } } + else { + // 不支持的 messageBody 形态 + return 10004; + } } catch (const std::exception& e) { std::cout << "处理客户端发送的消息错误,原因:" << e.what() << std::endl; @@ -4355,15 +4339,14 @@ void check_recall_file() { auto it = front.dir_files.find(front.cur_dir); if (it != front.dir_files.end()) { if (front.direct_mode) { - // ★新增:直下文件(精确匹配文件名) + // ▲直下:支持“目标名列表” + std::set want(front.target_filenames.begin(), front.target_filenames.end()); for (const auto& ent : it->second) { if (ent.flag != 1) continue; // 只要文件 - // 安全从 char[64] 转成 std::string size_t n = ::strnlen(ent.name, sizeof(ent.name)); std::string fname(ent.name, n); - if (fname == front.target_filename) { + if (want.find(fname) != want.end()) { front.download_queue.push_back(front.cur_dir + "/" + fname); - break; // 找到一个就足够 } } } else { @@ -4546,7 +4529,7 @@ bool enqueue_direct_download(const std::string& dev_id, rf.EndTime = "1970-01-01 00:00:01"; rf.dir_candidates = dir_candidates; // 传入要检索的目录列表 rf.direct_mode = true; // ★关键:直下文件 - rf.target_filename = filename; // ★关键:匹配的目标文件名 + rf.target_filenames.push_back(filename); // ▲单个文件名入“列表” lm_it->recall_list_static.push_back(std::move(rf)); diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 3c541f8..c376661 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -76,7 +76,7 @@ public: //暂态文件用 bool direct_mode = false; // 直下文件开关:true 表示不按时间窗,仅按目标文件名 - std::string target_filename; // 直下文件名(不含目录) + std::vector target_filenames; // 直下文件名(不含目录) std::list file_paths; // 已下载/要上报的完整路径(用于最终结果) @@ -118,7 +118,7 @@ public: // ★新增:按需保留直下文件开关和目标名 if (!keep_direct) { direct_mode = false; - target_filename.clear(); + target_filenames.clear(); // ▲列表清空 } } }; diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 4fafc91..25383c8 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -88,6 +88,20 @@ bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int l std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid); bool writeToFile(const std::string& filePath, const std::string& xmlContent); +//////////////////////////////////////////////////////////////////////////////////////////////////////消费起始控制 + +static const int64_t G_APP_START_MS = []() -> int64_t { + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); +}(); + +static const int64_t G_START_SKEW_MS = 1000; // 容错 1s,可按需调整 + +static inline bool should_process_after_start(const rocketmq::MQMessageExt& msg) { + const int64_t born_ts = static_cast(msg.getBornTimestamp()); + return born_ts >= (G_APP_START_MS - G_START_SKEW_MS); +} + ////////////////////////////////////////////////////////////////////////////////////////////////////// namespace rocketmq { @@ -423,34 +437,49 @@ bool parseJsonMessageSET(const std::string& json_str) { return false; } - if (!root.contains("messageBody") || !root["messageBody"].is_string()) { - std::cerr << "'messageBody' is missing or is not a string" << std::endl; - return false; - } - - std::string messageBodyStr = root["messageBody"]; - if (messageBodyStr.empty()) { - std::cerr << "'messageBody' is empty" << std::endl; - return false; - } - + // [MOD] 允许 messageBody 既可为字符串也可为对象;保持原有错误打印 + // ----- MOD BEGIN: messageBody 兼容 string/object ----- json messageBody; - try { - messageBody = json::parse(messageBodyStr); - } catch (const std::exception& e) { - std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl; + if (!root.contains("messageBody")) { + std::cerr << "missing 'messageBody'" << std::endl; return false; } + if (root["messageBody"].is_string()) { + std::string messageBodyStr = root["messageBody"].get(); + if (messageBodyStr.empty()) { + std::cerr << "'messageBody' is empty" << std::endl; + return false; + } + try { + messageBody = json::parse(messageBodyStr); + } catch (const std::exception& e) { + std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl; + return false; + } + } else if (root["messageBody"].is_object()) { + messageBody = root["messageBody"]; + } else { + std::cerr << "'messageBody' is neither string nor object" << std::endl; + return false; + } + // ----- MOD END: messageBody 兼容 string/object ----- - // 获取字段 - if (!messageBody.contains("guid") || !messageBody.contains("code") || - !messageBody.contains("processNo") || !messageBody.contains("fun") || - !messageBody.contains("frontType")) { + // [MOD] 基础必填字段仅校验 guid/code/processNo/fun;frontType、processNum 改为按功能分支再校验 + // ----- MOD BEGIN: 基础字段按需校验 ----- + if (!messageBody.contains("guid") || + !messageBody.contains("code") || + !messageBody.contains("processNo") || + !messageBody.contains("fun")) { std::cout << "Missing one or more required fields in messageBody." << std::endl; return false; } + // ----- MOD END: 基础字段按需校验 ----- - std::string guid, code_str, fun, frontType; + std::string guid, code_str, fun; + // [MOD] frontType 改为可选,给默认值 "all" + // ----- MOD BEGIN: frontType 可选 ----- + std::string frontType = "all"; + // ----- MOD END: frontType 可选 ----- int index_value = 0; try { @@ -458,13 +487,20 @@ bool parseJsonMessageSET(const std::string& json_str) { code_str = messageBody["code"].get(); index_value = messageBody["processNo"].get(); fun = messageBody["fun"].get(); - frontType = messageBody["frontType"].get(); + + // [MOD] 仅当存在 frontType 且为 string 时再读取,保持兼容 + // ----- MOD BEGIN: frontType 存在才解析 ----- + if (messageBody.contains("frontType") && messageBody["frontType"].is_string()) { + frontType = messageBody["frontType"].get(); + } + // ----- MOD END: frontType 存在才解析 ----- + } catch (const std::exception& e) { std::cerr << "Field parsing error: " << e.what() << std::endl; return false; } - // 判断进程号是否匹配 + // 判断进程号是否匹配(保留原逻辑) if (index_value != g_front_seg_index && g_front_seg_index != 0) { std::cout << "msg index: " << index_value << " doesn't match self index: " << g_front_seg_index << std::endl; return true; @@ -472,52 +508,67 @@ bool parseJsonMessageSET(const std::string& json_str) { std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl; - DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); if (code_str == "set_process") { - if (!messageBody.contains("processNum")) { - std::cout << "Missing 'processNum' in JSON." << std::endl; - return false; - } - int processNum = 0; - try { - processNum = messageBody["processNum"].get(); - } catch (...) { - std::cout << "'processNum' parsing failed." << std::endl; - return false; - } + // [MOD] 按功能分支分别校验参数: + // reset/add 需要 processNum(且可选 frontType,默认 all); + // delete 不需要 frontType/processNum + // ----- MOD BEGIN: 分功能按需校验与执行 ----- + if (fun == "reset" || fun == "add") { - // 校验参数并执行 - if ((fun == "reset" || fun == "add") && - (processNum >= 1 && processNum < 10) && - (frontType == "cloudfront" || frontType == "all")) { - - //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { - if (g_front_seg_index == 1) { - - execute_bash(fun, processNum, frontType); - - DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str()); - - send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!"); - std::cout << "this msg should only execute once" << std::endl; - } else { - std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl; + if (!messageBody.contains("processNum")) { + std::cout << "Missing 'processNum' in JSON." << std::endl; + return false; } - } - else if (fun == "delete") { + int processNum = 0; + try { + processNum = messageBody["processNum"].get(); + } catch (...) { + std::cout << "'processNum' parsing failed." << std::endl; + return false; + } + + // 校验参数并执行(保留你原校验条件,frontType 允许默认 all) + if ((processNum >= 1 && processNum < 10) && + (frontType == "cloudfront" || frontType == "all")) { + + // if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { + + execute_bash(fun, processNum, frontType); + + DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", + g_front_seg_index, fun.c_str()); + + send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!"); + std::cout << "this msg should only execute once" << std::endl; + } else { + std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl; + } + } else { + std::cout << "param is not executable" << std::endl; + } + + } else if (fun == "delete") { + + // delete 分支:不要求 frontType/processNum send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到删除进程指令,这个进程将会重启 "); - DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str()); + DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", + g_front_seg_index, fun.c_str()); - std::this_thread::sleep_for(std::chrono::seconds(10)); + std::this_thread::sleep_for(std::chrono::seconds(3)); ::_exit(-1039); // 进程退出 - } - else { + + } else { std::cout << "param is not executable" << std::endl; } + // ----- MOD END: 分功能按需校验与执行 ----- + } else { std::cout << "set process code str error" << std::endl; } @@ -774,7 +825,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d //删除旧的 ClientManager::instance().remove_device(json_data.terminal_id); - + init_loggers_bydevid(json_data.terminal_id); terminal_devlist.push_back(json_data); @@ -927,6 +978,20 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms return rocketmq::RECONSUME_LATER; } + // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) + // ----- MOD BEGIN: 启动后消息过滤 ----- + if (!should_process_after_start(msg)) { + std::cout << "[SET] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 + } + // ----- MOD END: 启动后消息过滤 ----- + std::string body = msg.getBody(); std::string key = msg.getKeys(); @@ -992,6 +1057,20 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms return rocketmq::RECONSUME_LATER; } + // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) + // ----- MOD BEGIN: 启动后消息过滤 ----- + if (!should_process_after_start(msg)) { + std::cout << "[SET] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 + } + // ----- MOD END: 启动后消息过滤 ----- + std::string body = msg.getBody(); std::string key = msg.getKeys(); @@ -1023,6 +1102,20 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) return rocketmq::RECONSUME_LATER; } + // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) + // ----- MOD BEGIN: 启动后消息过滤 ----- + if (!should_process_after_start(msg)) { + std::cout << "[SET] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 + } + // ----- MOD END: 启动后消息过滤 ----- + std::string body = msg.getBody(); std::string key = msg.getKeys(); @@ -1041,7 +1134,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) // 调用业务处理逻辑 if (!parseJsonMessageSET(body)) { - DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s - tag:%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index, G_MQCONSUMER_TOPIC_SET.c_str(), FRONT_INST.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -1053,6 +1146,20 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg) return rocketmq::RECONSUME_LATER; } + // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) + // ----- MOD BEGIN: 启动后消息过滤 ----- + if (!should_process_after_start(msg)) { + std::cout << "[SET] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 + } + // ----- MOD END: 启动后消息过滤 ----- + std::string body = msg.getBody(); std::string key = msg.getKeys(); @@ -1084,6 +1191,20 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms return rocketmq::RECONSUME_LATER; } + // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) + // ----- MOD BEGIN: 启动后消息过滤 ----- + if (!should_process_after_start(msg)) { + std::cout << "[SET] skip old message: " + << "topic=" << msg.getTopic() + << ", queueId=" << msg.getQueueId() + << ", offset=" << msg.getQueueOffset() + << ", bornTs=" << msg.getBornTimestamp() + << ", appStart=" << G_APP_START_MS + << std::endl; + return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 + } + // ----- MOD END: 启动后消息过滤 ----- + // 调试输出 std::cout << "myMessageCallbackrecall" << std::endl;