From cc909101dffbb47140e97dd248256bd9fe3094ef Mon Sep 17 00:00:00 2001 From: lnk Date: Fri, 27 Jun 2025 16:33:41 +0800 Subject: [PATCH] multi front process --- LFtid1056/client2.h | 7 ++- LFtid1056/cloudfront/boot/start_fe.sh | 2 +- LFtid1056/cloudfront/code/cfg_parser.cpp | 46 +++++++++-------- LFtid1056/cloudfront/code/interface.cpp | 22 +++++---- LFtid1056/cloudfront/code/interface.h | 9 ++-- LFtid1056/cloudfront/code/log4.cpp | 6 +-- LFtid1056/cloudfront/code/log4.h | 2 +- LFtid1056/cloudfront/code/log4cplus/log4.h | 2 +- LFtid1056/cloudfront/code/main.cpp | 26 +++++----- LFtid1056/cloudfront/code/rocketmq.cpp | 57 +++++++++++----------- LFtid1056/main_thread.cpp | 16 ++++-- 11 files changed, 105 insertions(+), 90 deletions(-) diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index 122a91d..f04d033 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -1,3 +1,6 @@ +#ifndef CLIENT_H +#define CLIENT_H + #include #include #include @@ -101,4 +104,6 @@ void try_reconnect(uv_timer_t* timer); void on_connect(uv_connect_t* req, int status); void on_close(uv_handle_t* handle); void init_clients(uv_loop_t* loop, const std::vector& devices); -void stop_all_clients(); \ No newline at end of file +void stop_all_clients(); + +#endif \ No newline at end of file diff --git a/LFtid1056/cloudfront/boot/start_fe.sh b/LFtid1056/cloudfront/boot/start_fe.sh index bf19973..3d3f1f5 100644 --- a/LFtid1056/cloudfront/boot/start_fe.sh +++ b/LFtid1056/cloudfront/boot/start_fe.sh @@ -11,7 +11,7 @@ QTDIR=/qt-4.8.4 export QTDIR -FEP_ENV=/FeProject +FEP_ENV=/home/pq/zwproject/LFtid1056 export FEP_ENV PATH=$FEP_ENV/bin:$QTDIR/bin:$PATH diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 5a9365b..94475e6 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -527,7 +527,7 @@ void init_config() { } //测试进程端口 - if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集 + /*if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集 TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index; else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召 TEST_PORT = TEST_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index; @@ -537,8 +537,8 @@ void init_config() { } else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波 TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index; - } - + }*/ + TEST_PORT = TEST_PORT + g_front_seg_index; } ////////////////////////////////////////////////////////////////////////////////////////////获取当前时间 @@ -817,7 +817,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) { DIR* dir = opendir(cfg_dir.c_str()); if (!dir) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件,补招文件路径FRONT_PATH + /etc/recall/不存在", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件,补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index); return false; } @@ -829,7 +829,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) { std::string filepath = cfg_dir + "/" + filename; tinyxml2::XMLDocument doc; if (doc.LoadFile(filepath.c_str()) != tinyxml2::XML_SUCCESS) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件%s,补招内容无效", get_front_msg_from_subdir(), g_front_seg_index, filepath.c_str()); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件%s,补招内容无效", g_front_seg_index, filepath.c_str()); continue; } @@ -871,20 +871,18 @@ void process_recall_config(recall_xml_t* recall_xml) //根据监测点id来获取补招数据,补招时调用这个 void Check_Recall_Config(const std::string& id) { - if (g_node_id == HIS_DATA_BASE_NODE_ID || + /*if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || - g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) { + g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {*/ - recall_xml_t recall_xml; - std::memset(&recall_xml, 0, sizeof(recall_xml_t)); - - // 解析补招文件 - parse_recall_xml(&recall_xml, id); - - // 将补招数据赋值到全局变量 - process_recall_config(&recall_xml); - } + recall_xml_t recall_xml; + std::memset(&recall_xml, 0, sizeof(recall_xml_t)); + // 解析补招文件 + parse_recall_xml(&recall_xml, id); + // 将补招数据赋值到全局变量 + process_recall_config(&recall_xml); + //} } //补招成功后删除补招文件,补招后调用这个 @@ -925,7 +923,7 @@ void DeletcRecallXml() { DIR* dir = opendir(cfg_dir.c_str()); if (!dir) { std::cerr << "folder does not exist!" << std::endl; - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index); return; } @@ -944,7 +942,7 @@ void DeletcRecallXml() { if (stat(fullpath.c_str(), &file_stat) == 0) { if (file_stat.st_mtime < cutoff) { if (remove(fullpath.c_str()) == 0) { - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 删除超过两天的补招文件",get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 删除超过两天的补招文件", g_front_seg_index); } else { std::cerr << "Failed to remove file: " << fullpath << std::endl; } @@ -966,7 +964,7 @@ void CreateRecallXml() { g_StatisticLackList_list_mutex.lock(); if (!g_StatisticLackList.empty()) { - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 开始写入补招文件", get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 开始写入补招文件", g_front_seg_index); std::map> id_map; for (const auto& jr : g_StatisticLackList) { @@ -1006,7 +1004,7 @@ void CreateRecallXml() { tinyxml2::XMLError save_result = doc.SaveFile(path.str().c_str()); if (save_result != tinyxml2::XML_SUCCESS) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法将补招文件写入路径: %s",get_front_msg_from_subdir(), g_front_seg_index, path.str().c_str()); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法将补招文件写入路径: %s", g_front_seg_index, path.str().c_str()); continue; } } @@ -1019,10 +1017,10 @@ void CreateRecallXml() { //生成待补招xml文件 void create_recall_xml() { - if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) { - DeletcRecallXml(); - CreateRecallXml(); - } + //if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) { + DeletcRecallXml(); + CreateRecallXml(); + //} } // 工具函数:将时间字符串转为 time_t(秒级) diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index 54aa73e..edac700 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -534,7 +534,7 @@ int terminal_ledger_web(std::map& terminal_dev_map, { if (inputstring.empty()) { std::cerr << "Error: inputstring is empty\n"; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程调用web台账接口的入参为空", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程调用web台账接口的入参为空", g_front_seg_index); return 1; } @@ -672,7 +672,8 @@ int terminal_ledger_web(std::map& terminal_dev_map, } // 5. 主进程保存台账 - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { save_ledger_json(responseStr); } @@ -691,7 +692,7 @@ int parse_device_cfg_web() input_jstr += "}"; std::cout << "input_jstr: " << input_jstr << std::endl; - DIY_DEBUGLOG("process","【DEBUG】前置的%s%d号进程调用web接口获取台账使用的请求输入为:%s",get_front_msg_from_subdir(), g_front_seg_index, input_jstr.c_str()); + DIY_DEBUGLOG("process","【DEBUG】前置的%d号进程调用web接口获取台账使用的请求输入为:%s", g_front_seg_index, input_jstr.c_str()); // 2. 调用接口 std::map terminal_dev_map; @@ -702,8 +703,9 @@ int parse_device_cfg_web() // 3. 调试打印 printTerminalDevMap(terminal_dev_map); - // 4. 看门狗配置校验(仅主进程稳态) - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + // 4. 看门狗配置校验(仅主进程) + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { int max_index = get_max_stat_data_index(FRONT_PATH + "/etc/runtime.cf"); std::cout << "max_index = " << max_index << std::endl; @@ -730,13 +732,13 @@ int parse_device_cfg_web() // 5. 台账数量与配置比对 int count_cfg = static_cast(terminal_dev_map.size()); std::cout << "terminal_ledger_num: " << count_cfg << std::endl; - DIY_DEBUGLOG("process", "【DEBUG】前置的%s%d号进程调用获取到的台账的数量为:%d",get_front_msg_from_subdir(), g_front_seg_index, count_cfg); + DIY_DEBUGLOG("process", "【DEBUG】前置的%d号进程调用获取到的台账的数量为:%d", g_front_seg_index, count_cfg); if (IED_COUNT < count_cfg) { std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl; - DIY_WARNLOG("process","【WARN】前置的%s%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); + DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT); } else { - DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); + DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT); } ///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map,后续根据协议台账修改 @@ -883,7 +885,7 @@ int parse_model_cfg_web() // 3. 调用接口 std::map icd_model_map; if (parse_model_web(&icd_model_map, input_jstr)) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index); // 确保释放 map for (auto& kv : icd_model_map) delete kv.second; return 0; @@ -937,7 +939,7 @@ std::string parse_model_cfg_web_one(const std::string& terminal_type) // 2. 拉取并解析 if (parse_model_web(&icd_model_map, input_jstr) != 0) { std::cerr << "parse_model_web failed for type: " << terminal_type << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index); // 清理(即使 map 为空,也安全) for (auto& kv : icd_model_map) delete kv.second; return ""; diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 5e564f5..9f74f13 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -20,13 +20,13 @@ class Front; /////////////////////////////////////////////////////////////////////////////////////////// -#define STAT_DATA_BASE_NODE_ID 100 +/*#define STAT_DATA_BASE_NODE_ID 100 #define THREE_SECS_DATA_BASE_NODE_ID 200 #define SOE_COMTRADE_BASE_NODE_ID 300 #define HIS_DATA_BASE_NODE_ID 400 #define NEW_HIS_DATA_BASE_NODE_ID 500 #define RECALL_HIS_DATA_BASE_NODE_ID 600 -#define RECALL_ALL_DATA_BASE_NODE_ID 700 +#define RECALL_ALL_DATA_BASE_NODE_ID 700*/ /////////////////////////////////////////////////////////////////////////////////////////// @@ -340,11 +340,14 @@ extern std::list queue_data_list; /////////////////////////////////////////////////////////////////////////////////主函数类声明 -std::string get_front_msg_from_subdir(); +//std::string get_front_msg_from_subdir(); extern std::string FRONT_PATH; +extern int g_front_seg_index; +extern int g_front_seg_num; void* cloudfrontthread(void* arg); +bool parse_param(int argc, char* argv[]); struct ThreadArgs { int argc; diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index cbfc1a0..0d4ed55 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -51,7 +51,7 @@ extern std::string subdir; extern std::string G_LOG_TOPIC; ////////////////////////////////////////////////////////辅助函数 -std::string get_front_type_from_subdir() { +/*std::string get_front_type_from_subdir() { if (subdir == "cfg_3s_data") return "realTime"; else if (subdir == "cfg_soe_comtrade") @@ -62,7 +62,7 @@ std::string get_front_type_from_subdir() { return "stat"; else return "unknown"; -} +}*/ // 递归创建目录 bool create_directory_recursive(const std::string& path) { @@ -153,7 +153,7 @@ protected: << "\",\"level\":\"" << level_str << "\",\"grade\":\"" << get_level_str(level) << "\",\"logtype\":\"" << (logtype == LOGTYPE_COM ? "com" : "data") - << "\",\"frontType\":\"" << get_front_type_from_subdir() + << "\",\"frontType\":\"" << "cloudfront" << "\",\"log\":\"" << escape_json(msg) << "\"}"; std::string jsonString = oss.str(); diff --git a/LFtid1056/cloudfront/code/log4.h b/LFtid1056/cloudfront/code/log4.h index 89b4d6a..ecab730 100644 --- a/LFtid1056/cloudfront/code/log4.h +++ b/LFtid1056/cloudfront/code/log4.h @@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch; extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); -std::string get_front_type_from_subdir(); +//std::string get_front_type_from_subdir(); // 不带 Appender 的版本 diff --git a/LFtid1056/cloudfront/code/log4cplus/log4.h b/LFtid1056/cloudfront/code/log4cplus/log4.h index 89b4d6a..ecab730 100644 --- a/LFtid1056/cloudfront/code/log4cplus/log4.h +++ b/LFtid1056/cloudfront/code/log4cplus/log4.h @@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch; extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); -std::string get_front_type_from_subdir(); +//std::string get_front_type_from_subdir(); // 不带 Appender 的版本 diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index c71546b..b75a46d 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -52,7 +52,7 @@ std::string FRONT_PATH; int INITFLAG = 0; //前置标置 -std::string subdir = "cfg_stat_data"; //默认稳态 +std::string subdir = "cloudfrontproc"; //子目录 uint32_t g_node_id = 0; int g_front_seg_index = 0; //默认单进程 int g_front_seg_num = 0; //默认单进程 @@ -139,7 +139,7 @@ bool parse_param(int argc, char* argv[]) { } //获取前置类型 -void init_global_function_enable() { +/*void init_global_function_enable() { if (subdir == "cfg_stat_data") { // 历史稳态 g_node_id = STAT_DATA_BASE_NODE_ID; auto_register_report_enabled = 1; @@ -151,10 +151,10 @@ void init_global_function_enable() { } else if (subdir == "cfg_recallhis_data") { // 补招 g_node_id = RECALL_HIS_DATA_BASE_NODE_ID; } -} +}*/ //获取功能名称 -std::string get_front_msg_from_subdir() { +/*std::string get_front_msg_from_subdir() { if (subdir.find("cfg_3s_data") != std::string::npos) return "实时数据进程"; else if (subdir.find("cfg_soe_comtrade") != std::string::npos) @@ -165,7 +165,7 @@ std::string get_front_msg_from_subdir() { return "稳态统计进程"; else return "unknown"; -} +}*/ //获取前置路径 std::string get_parent_directory() { @@ -199,14 +199,14 @@ std::string get_parent_directory() { { //初始化g_node_id - init_global_function_enable(); + //init_global_function_enable(); //配置初始化 init_config(); //启动进程日志 init_logger_process(); - DIY_WARNLOG("process","【WARN】前置的%s%d号进程 进程级日志初始化完毕", get_front_msg_from_subdir(), g_front_seg_index); + DIY_WARNLOG("process","【WARN】前置的%d号进程 进程级日志初始化完毕", g_front_seg_index); //读取台账 parse_device_cfg_web(); @@ -384,13 +384,13 @@ void Front::mqconsumerThread() std::string nameServer = G_MQCONSUMER_IPPORT; std::vector subscriptions; - if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { - subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata); - } + //if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { + subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata); + //} subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate); - if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { - subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall); - } + //if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { + subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall); + //} subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog); diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index b1c23f4..a23440b 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -278,7 +278,7 @@ void rocketmq_producer_send(rocketmq::RocketMQProducer* producer, producer->sendMessage(body, topic, tags, keys); } catch (const std::exception& e) { std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl; - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 MQ发送失败", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 MQ发送失败", g_front_seg_index); } } @@ -540,7 +540,7 @@ bool parseJsonMessageSET(const std::string& json_str) { std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl; - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); if (code_str == "set_process") { if (!messageBody.contains("processNum")) { @@ -559,13 +559,14 @@ bool parseJsonMessageSET(const std::string& json_str) { // 校验参数并执行 if ((fun == "reset" || fun == "add") && (processNum >= 1 && processNum < 10) && - (frontType == "stat" || frontType == "recall" || frontType == "all")) { + (frontType == "cloudfront" || frontType == "all")) { - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { execute_bash(fun, processNum, frontType); - DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); + DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str()); send_reply_to_queue(guid, "1", "收到重置进程指令,重启所有进程!"); std::cout << "this msg should only execute once" << std::endl; @@ -577,7 +578,7 @@ bool parseJsonMessageSET(const std::string& json_str) { send_reply_to_queue(guid, "1", "收到删除进程指令,这个进程将会重启 "); - DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); + DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str()); std::this_thread::sleep_for(std::chrono::seconds(10)); ::_exit(-1039); // 进程退出 @@ -658,15 +659,15 @@ bool parseJsonMessageLOG(const std::string& json_str) { } // 判断 frontType 是否匹配 - if (frontType != subdir) { + /*if (frontType != subdir) { std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl; return true; - } + }*/ - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理日志上送消息", get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理日志上送消息", g_front_seg_index); std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl; - std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl; + /*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/ // 回复消息 send_reply_to_queue(guid, "1", "收到实时日志指令"); @@ -683,7 +684,7 @@ bool parseJsonMessageLOG(const std::string& json_str) { process_log_command(id, level, grade, logtype); } else { std::cout << "type doesn't match" << std::endl; - DIY_WARNLOG("process", "【WARN】前置的%s%d号进程处理日志上送消息,格式不正确", get_front_msg_from_subdir(), g_front_seg_index); + DIY_WARNLOG("process", "【WARN】前置的%d号进程处理日志上送消息,格式不正确", g_front_seg_index); } std::cout << "this msg should only execute once" << std::endl; @@ -737,8 +738,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl; - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息", - get_front_msg_from_subdir(), g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); send_reply_to_queue(guid, "1", "收到台账更新指令"); @@ -865,7 +866,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms } else{ std::cerr << "rtdata is NULL." << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); } @@ -909,7 +910,7 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms // 调用业务逻辑处理函数 std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate"; if (!parseJsonMessageUD(body, updatefilepath)) { - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -939,7 +940,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) // 调用业务处理逻辑 if (!parseJsonMessageSET(body)) { - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -969,7 +970,7 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg) // 执行日志上送处理 if (!parseJsonMessageLOG(body)) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -1012,7 +1013,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms } else { std::cerr << "recall data is NULL." << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -1317,10 +1318,10 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime, data.strTopic = G_CONNECT_TOPIC; data.strText = jsonObject.dump(); // 转换为字符串 - if (g_node_id == STAT_DATA_BASE_NODE_ID) { - std::lock_guard lock(queue_data_list_mutex); - queue_data_list.push_back(data); - } + //if (g_node_id == STAT_DATA_BASE_NODE_ID) { + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); + //} } catch (const std::exception& e) { std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl; @@ -1337,7 +1338,7 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const obj["step"] = step; obj["result"] = result; obj["processNo"] = g_front_seg_index; - obj["frontType"] = get_front_type_from_subdir(); + obj["frontType"] = "cloudfront"; obj["nodeId"] = FRONT_INST; // 构造 queue 消息 @@ -1360,7 +1361,7 @@ void send_heartbeat_to_queue(const std::string& status) { try{ nlohmann::json obj; obj["nodeId"] = FRONT_INST; - obj["frontType"] = get_front_type_from_subdir(); + obj["frontType"] = "cloudfront"; obj["processNo"] = g_front_seg_index; obj["status"] = status; @@ -1432,8 +1433,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { if (type == 0) { std::cout << "use ledger send msg" << std::endl; - - for (size_t i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < terminal_devlist.size(); ++i) { + //根据台账模式下每个进程都会发送 + for (size_t i = 0; total_messages > 0 && i < terminal_devlist.size(); ++i) { const auto& dev = terminal_devlist[i]; if (shouldSkipTerminal(dev.terminal_id)) { @@ -1478,8 +1479,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { } } else { std::cout << "use monitor + number send msg" << std::endl; - - for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) { + //根据虚构监测点模式下只有进程1发送 + for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 ) && i < total_messages; ++i) { std::string monitor_id = "testmonitor" + std::to_string(i); data.mp_id = monitor_id; data.monitor_no = i; diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 3171a60..bf81b2f 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -9,6 +9,7 @@ #include "dealMsg.h" #include "cloudfront/code/interface.h" +#include using namespace std; #if 0 @@ -242,8 +243,8 @@ void restart_thread(int index) { } else if (index == 2) { // ӿڣmq - char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; - ThreadArgs* args = new ThreadArgs{3, argv}; + char* argv[] = { (char*)new_index };//Ҫ̺Ų + ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart message processor thread %d\n", index); @@ -265,7 +266,12 @@ int is_thread_alive(pthread_t tid) { } /* */ -int main() { +int main(int argc ,char** argv) {//Ӳ + if(!parse_param(argc,argv)){ + std::cerr << "process param error,exit" << std::endl; + return 1; + } + srand(time(NULL)); // ʼ // ʼ߳ @@ -296,8 +302,8 @@ int main() { } else if (i == 2){ //ӿںmq - char* argv[] = { (char*)index,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; - ThreadArgs* args = new ThreadArgs{3, argv}; + char* argv[] = { (char*)index };//Ҫ̺Ų + ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { printf("Failed to create message processor thread %d\n", i); delete args; // ߳ûɹֶͷ