multi front process

This commit is contained in:
lnk
2025-06-27 16:33:41 +08:00
parent 747d6c757c
commit cc909101df
11 changed files with 105 additions and 90 deletions

View File

@@ -1,3 +1,6 @@
#ifndef CLIENT_H
#define CLIENT_H
#include <uv.h> #include <uv.h>
#include <string> #include <string>
#include <vector> #include <vector>
@@ -102,3 +105,5 @@ void on_connect(uv_connect_t* req, int status);
void on_close(uv_handle_t* handle); void on_close(uv_handle_t* handle);
void init_clients(uv_loop_t* loop, const std::vector<DeviceInfo>& devices); void init_clients(uv_loop_t* loop, const std::vector<DeviceInfo>& devices);
void stop_all_clients(); void stop_all_clients();
#endif

View File

@@ -11,7 +11,7 @@
QTDIR=/qt-4.8.4 QTDIR=/qt-4.8.4
export QTDIR export QTDIR
FEP_ENV=/FeProject FEP_ENV=/home/pq/zwproject/LFtid1056
export FEP_ENV export FEP_ENV
PATH=$FEP_ENV/bin:$QTDIR/bin:$PATH PATH=$FEP_ENV/bin:$QTDIR/bin:$PATH

View File

@@ -527,7 +527,7 @@ void init_config() {
} }
//测试进程端口 //测试进程端口
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集 /*if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index; TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召 else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
TEST_PORT = TEST_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index; TEST_PORT = TEST_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
@@ -537,8 +537,8 @@ void init_config() {
} }
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波 else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index; TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
} }*/
TEST_PORT = TEST_PORT + g_front_seg_index;
} }
////////////////////////////////////////////////////////////////////////////////////////////获取当前时间 ////////////////////////////////////////////////////////////////////////////////////////////获取当前时间
@@ -817,7 +817,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) {
DIR* dir = opendir(cfg_dir.c_str()); DIR* dir = opendir(cfg_dir.c_str());
if (!dir) { if (!dir) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件补招文件路径FRONT_PATH + /etc/recall/不存在", get_front_msg_from_subdir(), g_front_seg_index); DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index);
return false; return false;
} }
@@ -829,7 +829,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) {
std::string filepath = cfg_dir + "/" + filename; std::string filepath = cfg_dir + "/" + filename;
tinyxml2::XMLDocument doc; tinyxml2::XMLDocument doc;
if (doc.LoadFile(filepath.c_str()) != tinyxml2::XML_SUCCESS) { if (doc.LoadFile(filepath.c_str()) != tinyxml2::XML_SUCCESS) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件%s,补招内容无效", get_front_msg_from_subdir(), g_front_seg_index, filepath.c_str()); DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件%s,补招内容无效", g_front_seg_index, filepath.c_str());
continue; continue;
} }
@@ -871,20 +871,18 @@ void process_recall_config(recall_xml_t* recall_xml)
//根据监测点id来获取补招数据补招时调用这个 //根据监测点id来获取补招数据补招时调用这个
void Check_Recall_Config(const std::string& id) { void Check_Recall_Config(const std::string& id) {
if (g_node_id == HIS_DATA_BASE_NODE_ID || /*if (g_node_id == HIS_DATA_BASE_NODE_ID ||
g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID ||
g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID ||
g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) { g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {*/
recall_xml_t recall_xml; recall_xml_t recall_xml;
std::memset(&recall_xml, 0, sizeof(recall_xml_t)); std::memset(&recall_xml, 0, sizeof(recall_xml_t));
// 解析补招文件
// 解析补招文件 parse_recall_xml(&recall_xml, id);
parse_recall_xml(&recall_xml, id); // 将补招数据赋值到全局变量
process_recall_config(&recall_xml);
// 将补招数据赋值到全局变量 //}
process_recall_config(&recall_xml);
}
} }
//补招成功后删除补招文件,补招后调用这个 //补招成功后删除补招文件,补招后调用这个
@@ -925,7 +923,7 @@ void DeletcRecallXml() {
DIR* dir = opendir(cfg_dir.c_str()); DIR* dir = opendir(cfg_dir.c_str());
if (!dir) { if (!dir) {
std::cerr << "folder does not exist!" << std::endl; std::cerr << "folder does not exist!" << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在",get_front_msg_from_subdir(), g_front_seg_index); DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index);
return; return;
} }
@@ -944,7 +942,7 @@ void DeletcRecallXml() {
if (stat(fullpath.c_str(), &file_stat) == 0) { if (stat(fullpath.c_str(), &file_stat) == 0) {
if (file_stat.st_mtime < cutoff) { if (file_stat.st_mtime < cutoff) {
if (remove(fullpath.c_str()) == 0) { if (remove(fullpath.c_str()) == 0) {
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 删除超过两天的补招文件",get_front_msg_from_subdir(), g_front_seg_index); DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 删除超过两天的补招文件", g_front_seg_index);
} else { } else {
std::cerr << "Failed to remove file: " << fullpath << std::endl; std::cerr << "Failed to remove file: " << fullpath << std::endl;
} }
@@ -966,7 +964,7 @@ void CreateRecallXml() {
g_StatisticLackList_list_mutex.lock(); g_StatisticLackList_list_mutex.lock();
if (!g_StatisticLackList.empty()) { if (!g_StatisticLackList.empty()) {
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 开始写入补招文件", get_front_msg_from_subdir(), g_front_seg_index); DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 开始写入补招文件", g_front_seg_index);
std::map<std::string, std::list<JournalRecall>> id_map; std::map<std::string, std::list<JournalRecall>> id_map;
for (const auto& jr : g_StatisticLackList) { for (const auto& jr : g_StatisticLackList) {
@@ -1006,7 +1004,7 @@ void CreateRecallXml() {
tinyxml2::XMLError save_result = doc.SaveFile(path.str().c_str()); tinyxml2::XMLError save_result = doc.SaveFile(path.str().c_str());
if (save_result != tinyxml2::XML_SUCCESS) { if (save_result != tinyxml2::XML_SUCCESS) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法将补招文件写入路径: %s",get_front_msg_from_subdir(), g_front_seg_index, path.str().c_str()); DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法将补招文件写入路径: %s", g_front_seg_index, path.str().c_str());
continue; continue;
} }
} }
@@ -1019,10 +1017,10 @@ void CreateRecallXml() {
//生成待补招xml文件 //生成待补招xml文件
void create_recall_xml() void create_recall_xml()
{ {
if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) { //if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) {
DeletcRecallXml(); DeletcRecallXml();
CreateRecallXml(); CreateRecallXml();
} //}
} }
// 工具函数:将时间字符串转为 time_t秒级 // 工具函数:将时间字符串转为 time_t秒级

View File

@@ -534,7 +534,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
{ {
if (inputstring.empty()) { if (inputstring.empty()) {
std::cerr << "Error: inputstring is empty\n"; std::cerr << "Error: inputstring is empty\n";
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程调用web台账接口的入参为空", get_front_msg_from_subdir(), g_front_seg_index); DIY_ERRORLOG("process","【ERROR】前置的%d号进程调用web台账接口的入参为空", g_front_seg_index);
return 1; return 1;
} }
@@ -672,7 +672,8 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
} }
// 5. 主进程保存台账 // 5. 主进程保存台账
if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
save_ledger_json(responseStr); save_ledger_json(responseStr);
} }
@@ -691,7 +692,7 @@ int parse_device_cfg_web()
input_jstr += "}"; input_jstr += "}";
std::cout << "input_jstr: " << input_jstr << std::endl; std::cout << "input_jstr: " << input_jstr << std::endl;
DIY_DEBUGLOG("process","【DEBUG】前置的%s%d号进程调用web接口获取台账使用的请求输入为:%s",get_front_msg_from_subdir(), g_front_seg_index, input_jstr.c_str()); DIY_DEBUGLOG("process","【DEBUG】前置的%d号进程调用web接口获取台账使用的请求输入为:%s", g_front_seg_index, input_jstr.c_str());
// 2. 调用接口 // 2. 调用接口
std::map<std::string, terminal_dev> terminal_dev_map; std::map<std::string, terminal_dev> terminal_dev_map;
@@ -702,8 +703,9 @@ int parse_device_cfg_web()
// 3. 调试打印 // 3. 调试打印
printTerminalDevMap(terminal_dev_map); printTerminalDevMap(terminal_dev_map);
// 4. 看门狗配置校验(仅主进程稳态 // 4. 看门狗配置校验(仅主进程)
if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
int max_index = get_max_stat_data_index(FRONT_PATH + "/etc/runtime.cf"); int max_index = get_max_stat_data_index(FRONT_PATH + "/etc/runtime.cf");
std::cout << "max_index = " << max_index << std::endl; std::cout << "max_index = " << max_index << std::endl;
@@ -730,13 +732,13 @@ int parse_device_cfg_web()
// 5. 台账数量与配置比对 // 5. 台账数量与配置比对
int count_cfg = static_cast<int>(terminal_dev_map.size()); int count_cfg = static_cast<int>(terminal_dev_map.size());
std::cout << "terminal_ledger_num: " << count_cfg << std::endl; std::cout << "terminal_ledger_num: " << count_cfg << std::endl;
DIY_DEBUGLOG("process", "【DEBUG】前置的%s%d号进程调用获取到的台账的数量为:%d",get_front_msg_from_subdir(), g_front_seg_index, count_cfg); DIY_DEBUGLOG("process", "【DEBUG】前置的%d号进程调用获取到的台账的数量为:%d", g_front_seg_index, count_cfg);
if (IED_COUNT < count_cfg) { if (IED_COUNT < count_cfg) {
std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl; std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl;
DIY_WARNLOG("process","【WARN】前置的%s%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT);
} else { } else {
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT);
} }
///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map后续根据协议台账修改 ///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map后续根据协议台账修改
@@ -883,7 +885,7 @@ int parse_model_cfg_web()
// 3. 调用接口 // 3. 调用接口
std::map<std::string, icd_model*> icd_model_map; std::map<std::string, icd_model*> icd_model_map;
if (parse_model_web(&icd_model_map, input_jstr)) { if (parse_model_web(&icd_model_map, input_jstr)) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index); DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index);
// 确保释放 map // 确保释放 map
for (auto& kv : icd_model_map) delete kv.second; for (auto& kv : icd_model_map) delete kv.second;
return 0; return 0;
@@ -937,7 +939,7 @@ std::string parse_model_cfg_web_one(const std::string& terminal_type)
// 2. 拉取并解析 // 2. 拉取并解析
if (parse_model_web(&icd_model_map, input_jstr) != 0) { if (parse_model_web(&icd_model_map, input_jstr) != 0) {
std::cerr << "parse_model_web failed for type: " << terminal_type << std::endl; std::cerr << "parse_model_web failed for type: " << terminal_type << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index); DIY_ERRORLOG("process","【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index);
// 清理(即使 map 为空,也安全) // 清理(即使 map 为空,也安全)
for (auto& kv : icd_model_map) delete kv.second; for (auto& kv : icd_model_map) delete kv.second;
return ""; return "";

View File

@@ -20,13 +20,13 @@ class Front;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
#define STAT_DATA_BASE_NODE_ID 100 /*#define STAT_DATA_BASE_NODE_ID 100
#define THREE_SECS_DATA_BASE_NODE_ID 200 #define THREE_SECS_DATA_BASE_NODE_ID 200
#define SOE_COMTRADE_BASE_NODE_ID 300 #define SOE_COMTRADE_BASE_NODE_ID 300
#define HIS_DATA_BASE_NODE_ID 400 #define HIS_DATA_BASE_NODE_ID 400
#define NEW_HIS_DATA_BASE_NODE_ID 500 #define NEW_HIS_DATA_BASE_NODE_ID 500
#define RECALL_HIS_DATA_BASE_NODE_ID 600 #define RECALL_HIS_DATA_BASE_NODE_ID 600
#define RECALL_ALL_DATA_BASE_NODE_ID 700 #define RECALL_ALL_DATA_BASE_NODE_ID 700*/
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@@ -340,11 +340,14 @@ extern std::list<queue_data_t> queue_data_list;
/////////////////////////////////////////////////////////////////////////////////主函数类声明 /////////////////////////////////////////////////////////////////////////////////主函数类声明
std::string get_front_msg_from_subdir(); //std::string get_front_msg_from_subdir();
extern std::string FRONT_PATH; extern std::string FRONT_PATH;
extern int g_front_seg_index;
extern int g_front_seg_num;
void* cloudfrontthread(void* arg); void* cloudfrontthread(void* arg);
bool parse_param(int argc, char* argv[]);
struct ThreadArgs { struct ThreadArgs {
int argc; int argc;

View File

@@ -51,7 +51,7 @@ extern std::string subdir;
extern std::string G_LOG_TOPIC; extern std::string G_LOG_TOPIC;
////////////////////////////////////////////////////////辅助函数 ////////////////////////////////////////////////////////辅助函数
std::string get_front_type_from_subdir() { /*std::string get_front_type_from_subdir() {
if (subdir == "cfg_3s_data") if (subdir == "cfg_3s_data")
return "realTime"; return "realTime";
else if (subdir == "cfg_soe_comtrade") else if (subdir == "cfg_soe_comtrade")
@@ -62,7 +62,7 @@ std::string get_front_type_from_subdir() {
return "stat"; return "stat";
else else
return "unknown"; return "unknown";
} }*/
// 递归创建目录 // 递归创建目录
bool create_directory_recursive(const std::string& path) { bool create_directory_recursive(const std::string& path) {
@@ -153,7 +153,7 @@ protected:
<< "\",\"level\":\"" << level_str << "\",\"level\":\"" << level_str
<< "\",\"grade\":\"" << get_level_str(level) << "\",\"grade\":\"" << get_level_str(level)
<< "\",\"logtype\":\"" << (logtype == LOGTYPE_COM ? "com" : "data") << "\",\"logtype\":\"" << (logtype == LOGTYPE_COM ? "com" : "data")
<< "\",\"frontType\":\"" << get_front_type_from_subdir() << "\",\"frontType\":\"" << "cloudfront"
<< "\",\"log\":\"" << escape_json(msg) << "\"}"; << "\",\"log\":\"" << escape_json(msg) << "\"}";
std::string jsonString = oss.str(); std::string jsonString = oss.str();

View File

@@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch;
extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result);
std::string get_front_type_from_subdir(); //std::string get_front_type_from_subdir();
// 不带 Appender 的版本 // 不带 Appender 的版本

View File

@@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch;
extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result);
std::string get_front_type_from_subdir(); //std::string get_front_type_from_subdir();
// 不带 Appender 的版本 // 不带 Appender 的版本

View File

@@ -52,7 +52,7 @@ std::string FRONT_PATH;
int INITFLAG = 0; int INITFLAG = 0;
//前置标置 //前置标置
std::string subdir = "cfg_stat_data"; //默认稳态 std::string subdir = "cloudfrontproc"; //子目录
uint32_t g_node_id = 0; uint32_t g_node_id = 0;
int g_front_seg_index = 0; //默认单进程 int g_front_seg_index = 0; //默认单进程
int g_front_seg_num = 0; //默认单进程 int g_front_seg_num = 0; //默认单进程
@@ -139,7 +139,7 @@ bool parse_param(int argc, char* argv[]) {
} }
//获取前置类型 //获取前置类型
void init_global_function_enable() { /*void init_global_function_enable() {
if (subdir == "cfg_stat_data") { // 历史稳态 if (subdir == "cfg_stat_data") { // 历史稳态
g_node_id = STAT_DATA_BASE_NODE_ID; g_node_id = STAT_DATA_BASE_NODE_ID;
auto_register_report_enabled = 1; auto_register_report_enabled = 1;
@@ -151,10 +151,10 @@ void init_global_function_enable() {
} else if (subdir == "cfg_recallhis_data") { // 补招 } else if (subdir == "cfg_recallhis_data") { // 补招
g_node_id = RECALL_HIS_DATA_BASE_NODE_ID; g_node_id = RECALL_HIS_DATA_BASE_NODE_ID;
} }
} }*/
//获取功能名称 //获取功能名称
std::string get_front_msg_from_subdir() { /*std::string get_front_msg_from_subdir() {
if (subdir.find("cfg_3s_data") != std::string::npos) if (subdir.find("cfg_3s_data") != std::string::npos)
return "实时数据进程"; return "实时数据进程";
else if (subdir.find("cfg_soe_comtrade") != std::string::npos) else if (subdir.find("cfg_soe_comtrade") != std::string::npos)
@@ -165,7 +165,7 @@ std::string get_front_msg_from_subdir() {
return "稳态统计进程"; return "稳态统计进程";
else else
return "unknown"; return "unknown";
} }*/
//获取前置路径 //获取前置路径
std::string get_parent_directory() { std::string get_parent_directory() {
@@ -199,14 +199,14 @@ std::string get_parent_directory() {
{ {
//初始化g_node_id //初始化g_node_id
init_global_function_enable(); //init_global_function_enable();
//配置初始化 //配置初始化
init_config(); init_config();
//启动进程日志 //启动进程日志
init_logger_process(); init_logger_process();
DIY_WARNLOG("process","【WARN】前置的%s%d号进程 进程级日志初始化完毕", get_front_msg_from_subdir(), g_front_seg_index); DIY_WARNLOG("process","【WARN】前置的%d号进程 进程级日志初始化完毕", g_front_seg_index);
//读取台账 //读取台账
parse_device_cfg_web(); parse_device_cfg_web();
@@ -384,13 +384,13 @@ void Front::mqconsumerThread()
std::string nameServer = G_MQCONSUMER_IPPORT; std::string nameServer = G_MQCONSUMER_IPPORT;
std::vector<rocketmq::Subscription> subscriptions; std::vector<rocketmq::Subscription> subscriptions;
if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { //if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata);
} //}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate);
if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { //if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall);
} //}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset);
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog);

View File

@@ -278,7 +278,7 @@ void rocketmq_producer_send(rocketmq::RocketMQProducer* producer,
producer->sendMessage(body, topic, tags, keys); producer->sendMessage(body, topic, tags, keys);
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl; std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 MQ发送失败", get_front_msg_from_subdir(), g_front_seg_index); DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 MQ发送失败", g_front_seg_index);
} }
} }
@@ -540,7 +540,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl; std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl;
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
if (code_str == "set_process") { if (code_str == "set_process") {
if (!messageBody.contains("processNum")) { if (!messageBody.contains("processNum")) {
@@ -559,13 +559,14 @@ bool parseJsonMessageSET(const std::string& json_str) {
// 校验参数并执行 // 校验参数并执行
if ((fun == "reset" || fun == "add") && if ((fun == "reset" || fun == "add") &&
(processNum >= 1 && processNum < 10) && (processNum >= 1 && processNum < 10) &&
(frontType == "stat" || frontType == "recall" || frontType == "all")) { (frontType == "cloudfront" || frontType == "all")) {
if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
execute_bash(fun, processNum, frontType); execute_bash(fun, processNum, frontType);
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str());
send_reply_to_queue(guid, "1", "收到重置进程指令,重启所有进程!"); send_reply_to_queue(guid, "1", "收到重置进程指令,重启所有进程!");
std::cout << "this msg should only execute once" << std::endl; std::cout << "this msg should only execute once" << std::endl;
@@ -577,7 +578,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
send_reply_to_queue(guid, "1", "收到删除进程指令,这个进程将会重启 "); send_reply_to_queue(guid, "1", "收到删除进程指令,这个进程将会重启 ");
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str());
std::this_thread::sleep_for(std::chrono::seconds(10)); std::this_thread::sleep_for(std::chrono::seconds(10));
::_exit(-1039); // 进程退出 ::_exit(-1039); // 进程退出
@@ -658,15 +659,15 @@ bool parseJsonMessageLOG(const std::string& json_str) {
} }
// 判断 frontType 是否匹配 // 判断 frontType 是否匹配
if (frontType != subdir) { /*if (frontType != subdir) {
std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl; std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl;
return true; return true;
} }*/
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理日志上送消息", get_front_msg_from_subdir(), g_front_seg_index); DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理日志上送消息", g_front_seg_index);
std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl; std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl;
std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl; /*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/
// 回复消息 // 回复消息
send_reply_to_queue(guid, "1", "收到实时日志指令"); send_reply_to_queue(guid, "1", "收到实时日志指令");
@@ -683,7 +684,7 @@ bool parseJsonMessageLOG(const std::string& json_str) {
process_log_command(id, level, grade, logtype); process_log_command(id, level, grade, logtype);
} else { } else {
std::cout << "type doesn't match" << std::endl; std::cout << "type doesn't match" << std::endl;
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程处理日志上送消息,格式不正确", get_front_msg_from_subdir(), g_front_seg_index); DIY_WARNLOG("process", "【WARN】前置的%d号进程处理日志上送消息,格式不正确", g_front_seg_index);
} }
std::cout << "this msg should only execute once" << std::endl; std::cout << "this msg should only execute once" << std::endl;
@@ -737,8 +738,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl; std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl;
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息", DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息",
get_front_msg_from_subdir(), g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
send_reply_to_queue(guid, "1", "收到台账更新指令"); send_reply_to_queue(guid, "1", "收到台账更新指令");
@@ -865,7 +866,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
} }
else{ else{
std::cerr << "rtdata is NULL." << std::endl; std::cerr << "rtdata is NULL." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
} }
@@ -909,7 +910,7 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms
// 调用业务逻辑处理函数 // 调用业务逻辑处理函数
std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate"; std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate";
if (!parseJsonMessageUD(body, updatefilepath)) { if (!parseJsonMessageUD(body, updatefilepath)) {
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
} }
return rocketmq::CONSUME_SUCCESS; return rocketmq::CONSUME_SUCCESS;
@@ -939,7 +940,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg)
// 调用业务处理逻辑 // 调用业务处理逻辑
if (!parseJsonMessageSET(body)) { if (!parseJsonMessageSET(body)) {
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
} }
return rocketmq::CONSUME_SUCCESS; return rocketmq::CONSUME_SUCCESS;
@@ -969,7 +970,7 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg)
// 执行日志上送处理 // 执行日志上送处理
if (!parseJsonMessageLOG(body)) { if (!parseJsonMessageLOG(body)) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); DIY_ERRORLOG("process", "【ERROR】前置的%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str());
} }
return rocketmq::CONSUME_SUCCESS; return rocketmq::CONSUME_SUCCESS;
@@ -1012,7 +1013,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms
} else { } else {
std::cerr << "recall data is NULL." << std::endl; std::cerr << "recall data is NULL." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
} }
return rocketmq::CONSUME_SUCCESS; return rocketmq::CONSUME_SUCCESS;
@@ -1317,10 +1318,10 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime,
data.strTopic = G_CONNECT_TOPIC; data.strTopic = G_CONNECT_TOPIC;
data.strText = jsonObject.dump(); // 转换为字符串 data.strText = jsonObject.dump(); // 转换为字符串
if (g_node_id == STAT_DATA_BASE_NODE_ID) { //if (g_node_id == STAT_DATA_BASE_NODE_ID) {
std::lock_guard<std::mutex> lock(queue_data_list_mutex); std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data); queue_data_list.push_back(data);
} //}
} }
catch (const std::exception& e) { catch (const std::exception& e) {
std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl; std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl;
@@ -1337,7 +1338,7 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const
obj["step"] = step; obj["step"] = step;
obj["result"] = result; obj["result"] = result;
obj["processNo"] = g_front_seg_index; obj["processNo"] = g_front_seg_index;
obj["frontType"] = get_front_type_from_subdir(); obj["frontType"] = "cloudfront";
obj["nodeId"] = FRONT_INST; obj["nodeId"] = FRONT_INST;
// 构造 queue 消息 // 构造 queue 消息
@@ -1360,7 +1361,7 @@ void send_heartbeat_to_queue(const std::string& status) {
try{ try{
nlohmann::json obj; nlohmann::json obj;
obj["nodeId"] = FRONT_INST; obj["nodeId"] = FRONT_INST;
obj["frontType"] = get_front_type_from_subdir(); obj["frontType"] = "cloudfront";
obj["processNo"] = g_front_seg_index; obj["processNo"] = g_front_seg_index;
obj["status"] = status; obj["status"] = status;
@@ -1432,8 +1433,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) {
if (type == 0) { if (type == 0) {
std::cout << "use ledger send msg" << std::endl; std::cout << "use ledger send msg" << std::endl;
//根据台账模式下每个进程都会发送
for (size_t i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < terminal_devlist.size(); ++i) { for (size_t i = 0; total_messages > 0 && i < terminal_devlist.size(); ++i) {
const auto& dev = terminal_devlist[i]; const auto& dev = terminal_devlist[i];
if (shouldSkipTerminal(dev.terminal_id)) { if (shouldSkipTerminal(dev.terminal_id)) {
@@ -1478,8 +1479,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) {
} }
} else { } else {
std::cout << "use monitor + number send msg" << std::endl; std::cout << "use monitor + number send msg" << std::endl;
//根据虚构监测点模式下只有进程1发送
for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) { for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 ) && i < total_messages; ++i) {
std::string monitor_id = "testmonitor" + std::to_string(i); std::string monitor_id = "testmonitor" + std::to_string(i);
data.mp_id = monitor_id; data.mp_id = monitor_id;
data.monitor_no = i; data.monitor_no = i;

View File

@@ -9,6 +9,7 @@
#include "dealMsg.h" #include "dealMsg.h"
#include "cloudfront/code/interface.h" #include "cloudfront/code/interface.h"
#include <iostream>
using namespace std; using namespace std;
#if 0 #if 0
@@ -242,8 +243,8 @@ void restart_thread(int index) {
} }
else if (index == 2) { else if (index == 2) {
// <20>ӿڣ<D3BF>mq // <20>ӿڣ<D3BF>mq
char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; char* argv[] = { (char*)new_index };//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̺Ų<CCBA><C5B2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ThreadArgs* args = new ThreadArgs{3, argv}; ThreadArgs* args = new ThreadArgs{1, argv};
if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) { if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) {
pthread_mutex_lock(&global_lock); pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index); printf("Failed to restart message processor thread %d\n", index);
@@ -265,7 +266,12 @@ int is_thread_alive(pthread_t tid) {
} }
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */ /* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
int main() { int main(int argc ,char** argv) {//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӳ<EFBFBD><D3B2><EFBFBD>
if(!parse_param(argc,argv)){
std::cerr << "process param error,exit" << std::endl;
return 1;
}
srand(time(NULL)); // <20><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> srand(time(NULL)); // <20><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
// <20><>ʼ<EFBFBD><CABC><EFBFBD>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD> // <20><>ʼ<EFBFBD><CABC><EFBFBD>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD>
@@ -296,8 +302,8 @@ int main() {
} }
else if (i == 2){ else if (i == 2){
//<2F>ӿں<D3BF>mq //<2F>ӿں<D3BF>mq
char* argv[] = { (char*)index,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; char* argv[] = { (char*)index };//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̺Ų<CCBA><C5B2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ThreadArgs* args = new ThreadArgs{3, argv}; ThreadArgs* args = new ThreadArgs{1, argv};
if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) {
printf("Failed to create message processor thread %d\n", i); printf("Failed to create message processor thread %d\n", i);
delete args; // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD>ɹ<EFBFBD><C9B9><EFBFBD><EFBFBD>ֶ<EFBFBD><D6B6>ͷ<EFBFBD> delete args; // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD>ɹ<EFBFBD><C9B9><EFBFBD><EFBFBD>ֶ<EFBFBD><D6B6>ͷ<EFBFBD>