///////////////////////////////////////////////////////////////////////////////////////////////////// #include #include //任务队列 #include //信号处理 #include //标准输入输出 #include //字符串 #include //数组型容器 #include //映射 #include //锁 #include //线程 #include //线程控制 #include //原子操作 #include //时间 #include //时间 #include //流处理 #include //字符处理 #include //格式控制 #include //类消费者绑定 #include //获取目录 #include #include #include #include #include #include #include //////////////////////////////////////////////////////////////////////////////////////////////////////// #include "interface.h" //用于访问接口 #include "log4.h" //用于日志 #include "curl/curl.h" //用于访问接口 #include "nlohmann/json.hpp" //用于构造json #include "worker.h" //shell接口 #include "rocketmq.h" #include "rocketmq/MQClientException.h" #include "front.h" //////////////////////////////////////////////////////////////////////////////////////////////////////命名空间 //lnk 20251211 namespace { std::mutex g_streamMutex; void safe_out_str(const std::string& s) { std::lock_guard lk(g_streamMutex); std::cout << s << std::flush; } void safe_out_line(const std::string& s) { std::lock_guard lk(g_streamMutex); std::cout << s << std::endl; } void safe_err_line(const std::string& s) { std::lock_guard lk(g_streamMutex); std::cerr << s << std::endl; } } using json = nlohmann::json; //////////////////////////////////////////////////////////////////////////////////////////////////////全局变量 //前置程序路径 std::string FRONT_PATH; //初始化标志 std::atomic INITFLAG{0}; //前置标置 std::string subdir = "cloudfrontproc"; //子目录 uint32_t g_node_id = 0; int g_front_seg_index = 0; //默认单进程 int g_front_seg_num = 0; //默认单进程 //实时进程标志 int three_secs_enabled = 0; //稳态进程自动注册报告标志 int auto_register_report_enabled = 0; //mq生产线程和定时线程都加上死锁计数器 uint32_t g_mqproducer_blocked_times = 0; uint32_t g_ontime_blocked_times = 0; //进程控制 std::atomic running{true}; void onSignal(int){ running = false; } ///////////////////////////////////////////////////////////////////////////////////////////////////// extern int G_TEST_FLAG; //测试线程开启开关 extern int TEST_PORT; //测试端口号 extern std::string FRONT_INST; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数 template std::unique_ptr make_unique(Args&&... args) { return std::unique_ptr(new T(std::forward(args)...)); } // 把“今天”做成年月日整数(YYYYMMDD),用于“每天只清理一次”的判定 static inline int local_ymd_today() { std::time_t now = std::time(nullptr); std::tm local_tm{}; #if defined(_WIN32) || defined(_WIN64) localtime_s(&local_tm, &now); #else local_tm = *std::localtime(&now); #endif return (local_tm.tm_year + 1900) * 10000 + (local_tm.tm_mon + 1) * 100 + local_tm.tm_mday; } //处理参数 bool parse_param(int argc, char* argv[]) { for (int i = 1; i < argc; ++i) { std::string arg = argv[i]; // 处理 -s 参数 if (arg == "-s" && i + 1 < argc) { std::string val = argv[++i]; auto pos = val.find('_'); if (pos != std::string::npos) { try { g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_num = std::stoi(val.substr(pos + 1)); } catch (...) { std::cerr << "Invalid -s format." << std::endl; } } continue; } // 处理 -s1_5 这种紧凑写法 if (arg.rfind("-s", 0) == 0 && arg.length() > 2) { std::string val = arg.substr(2); auto pos = val.find('_'); if (pos != std::string::npos) { try { g_front_seg_index = std::stoi(val.substr(0, pos)); g_front_seg_num = std::stoi(val.substr(pos + 1)); } catch (...) { std::cerr << "Invalid -s format." << std::endl; } } continue; } // 处理 -d 或 -D 参数 if ((arg == "-d" || arg == "-D") && i + 1 < argc) { subdir = argv[++i]; continue; } if ((arg.rfind("-d", 0) == 0 || arg.rfind("-D", 0) == 0) && arg.length() > 2) { subdir = arg.substr(2); continue; } // 这里可以继续添加其它参数解析,例如 -x, -y // if (arg == "-x" ... ) {...} } // 输出结果 std::cout << "g_front_seg_index: " << g_front_seg_index << "\n"; std::cout << "g_front_seg_num : " << g_front_seg_num << "\n"; std::cout << "subdir : " << subdir << "\n"; return true; } //获取前置路径 std::string get_parent_directory() { // 获取当前工作目录 char cwd[PATH_MAX]; if (!getcwd(cwd, sizeof(cwd))) { // 获取失败 return ""; } // dirname 可能会修改传入的字符串,需要副本 std::string current_dir(cwd); std::unique_ptr temp(new char[current_dir.size() + 1]); std::strcpy(temp.get(), current_dir.c_str()); // 获取父目录 char* parent = dirname(temp.get()); if (!parent) return ""; // 返回绝对路径 return std::string(parent); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////主要类结构 //------------------- Front 类(C++) ------------------- //构造函数 Front::Front(): m_worker(this), m_threadPool(std::thread::hardware_concurrency()) // 用系统核数初始化线程池 { //初始化g_node_id //init_global_function_enable(); //配置初始化 init_config(); //启动进程日志 init_logger_process(); DIY_WARNLOG("process","【WARN】前置的%d号进程 进程级日志初始化完毕", g_front_seg_index); //读取台账 parse_device_cfg_web(); //初始化日志 init_loggers(); //读取模型,下载模板文件 //parse_model_cfg_web(); //解析模板文件 //Set_xml_nodeinfo(); StartFrontThread(); //开启主线程 StartMQConsumerThread(); //开启消费者线程 StartMQProducerThread(); //开启生产者线程 StartTimerThread(); //开启定时线程 //启动worker 根据启动标志启动 if(G_TEST_FLAG){ if(!m_worker.startServer(TEST_PORT)) { std::cerr << "[testshell] startServer failed.\n"; } } //初始化标志 std::this_thread::sleep_for(std::chrono::seconds(3)); INITFLAG = 1; } Front::~Front() { FormClosing(); } // ============ 关闭所有运行中的线程============ /*void Front::FormClosing() { //确保testshell关闭 m_worker.stopServer(); // **确保前置线程被关闭** if(m_FrontThread.joinable()) { m_bIsFrontThreadCancle = true; m_FrontThread.join(); // **等待前置线程结束** } // 定时线程 if (m_TimerThread.joinable()) { m_IsTimerCancel = true; m_TimerThread.join(); } // 生产者线程 if (m_MQProducerThread.joinable()) { m_IsMQProducerCancel = true; m_MQProducerThread.join(); } // 消费者线程 m_IsMQConsumerCancel = true; if (m_mqConsumer) { try { m_mqConsumer->shutdown(); } catch (...) { std::cerr << "mq consumer shutdown error" << std::endl; } m_mqConsumer.reset(); } m_listener.reset(); if (m_MQConsumerThread.joinable()) { m_MQConsumerThread.join(); } }*/ void Front::FormClosing() { m_worker.stopServer(); StopFrontThread(); StopTimerThread(); StopMQProducerThread(); StopMQConsumerThread(); } //============ 线程函数 ============ /*void Front::StartFrontThread() { m_bIsFrontThreadCancle = false; m_FrontThread = std::thread(&Front::FrontThread, this); } void Front::StartMQConsumerThread() { m_IsMQConsumerCancel = false; m_MQConsumerThread = std::thread(&Front::mqconsumerThread, this); } void Front::StartMQProducerThread() { m_IsMQProducerCancel = false; m_MQProducerThread = std::thread(&Front::mqproducerThread, this); } void Front::StartTimerThread() { m_IsTimerCancel = false; m_TimerThread = std::thread(&Front::OnTimerThread, this); } */ void Front::StartFrontThread() { bool expected = false; if (!m_frontRunning.compare_exchange_strong(expected, true)) { std::cout << "[FrontThread] already running, skip\n"; return; } if (m_FrontThread.joinable()) m_FrontThread.join(); m_bIsFrontThreadCancle = false; m_FrontThread = std::thread([this]{ try { this->FrontThread(); } catch (const std::exception& e) { std::cerr << "[FrontThread] exception: " << e.what() << "\n"; } catch (...) { std::cerr << "[FrontThread] unknown exception\n"; } m_frontRunning = false; // 线程真正退出后复位 }); } void Front::StartMQConsumerThread() { bool expected = false; if (!m_consumerRunning.compare_exchange_strong(expected, true)) { std::cout << "[MQConsumer] already running, skip\n"; return; } if (m_MQConsumerThread.joinable()) m_MQConsumerThread.join(); m_IsMQConsumerCancel = false; m_MQConsumerThread = std::thread([this]{ try { this->mqconsumerThread(); } catch (const std::exception& e) { std::cerr << "[mqconsumerThread] exception: " << e.what() << "\n"; } catch (...) { std::cerr << "[mqconsumerThread] unknown exception\n"; } m_consumerRunning = false; }); } void Front::StartMQProducerThread() { bool expected = false; if (!m_producerRunning.compare_exchange_strong(expected, true)) { std::cout << "[MQProducer] already running, skip\n"; return; } if (m_MQProducerThread.joinable()) m_MQProducerThread.join(); m_IsMQProducerCancel = false; m_MQProducerThread = std::thread([this]{ try { this->mqproducerThread(); } catch (const std::exception& e) { std::cerr << "[mqproducerThread] exception: " << e.what() << "\n"; } catch (...) { std::cerr << "[mqproducerThread] unknown exception\n"; } m_producerRunning = false; }); } void Front::StartTimerThread() { bool expected = false; if (!m_timerRunning.compare_exchange_strong(expected, true)) { std::cout << "[Timer] already running, skip StartTimerThread\n"; return; // 已有定时线程在跑,直接跳过 } // 若有旧线程尚未 join,先回收 if (m_TimerThread.joinable()) { m_TimerThread.join(); } m_IsTimerCancel.store(false, std::memory_order_relaxed); m_TimerThread = std::thread([this]{ try { this->OnTimerThread(); } catch (const std::exception& e) { std::cerr << "[Timer] exception: " << e.what() << "\n"; } catch (...) { std::cerr << "[Timer] unknown exception\n"; } m_timerRunning.store(false); // 线程真正退出后复位 }); } void Front::StopFrontThread() { if (!m_frontRunning.load()) return; m_bIsFrontThreadCancle = true; if (m_FrontThread.joinable()) m_FrontThread.join(); m_frontRunning = false; } void Front::StopMQConsumerThread() { if (!m_consumerRunning.load()) return; m_IsMQConsumerCancel = true; // 你的线程函数可能不轮询此标志,但先置上 // 关闭 MQ 对象(避免内部阻塞线程仍在) if (m_mqConsumer) { try { m_mqConsumer->shutdown(); } catch (...) {} m_mqConsumer.reset(); } m_listener.reset(); if (m_MQConsumerThread.joinable()) m_MQConsumerThread.join(); m_consumerRunning = false; } void Front::StopMQProducerThread() { if (!m_producerRunning.load()) return; m_IsMQProducerCancel = true; if (m_MQProducerThread.joinable()) m_MQProducerThread.join(); m_producerRunning = false; // 如需销毁/关闭 producer,对应你的初始化方式: // if (m_producer) { ShutdownProducer(m_producer); m_producer = nullptr; } } void Front::StopTimerThread() { if (!m_timerRunning.load()) return; // 没跑就不处理 m_IsTimerCancel.store(true); if (m_TimerThread.joinable()) { m_TimerThread.join(); // 等它退出 } m_timerRunning.store(false); } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////主功能线程 void Front::FrontThread() { safe_out_line("FrontThread::run() is called ...... \n"); try { while (!m_bIsFrontThreadCancle) { check_recall_event(); // 处理补招事件,从list中读取然后直接调用接口,每一条可能都不同测点,每个测点自己做好记录 check_recall_file(); //处理补招文件-稳态和暂态 std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } catch (const std::exception& e) { safe_err_line(std::string("[FrontThread] Caught exception: ") + e.what()); } catch (...) { safe_err_line("[FrontThread] Caught unknown exception"); } // 设置重启标志 { std::lock_guard lock(m_threadCheckMutex); m_needRestartFrontThread = true; } safe_out_line("[FrontThread] exited, will be restarted by monitor\n"); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////定时任务 void Front::OnTimerThread() { try { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); safe_out_line("OnTimerThread::run() is called ...... \n"); int hbCounter = 0; // 心跳计数 int backupCounter = 0; // 备份计数(分钟用) send_heartbeat_to_queue("1"); //记录“上次做日清理”的日期(YYYYMMDD),确保每天只做一次 static int s_lastCleanupYMD = -1; while (!m_IsTimerCancel) { update_log_entries_countdown(); //业务超时检查 check_device_busy_timeout(); // 每 30 秒发一次心跳 if (hbCounter >= 30) { send_heartbeat_to_queue("1"); hbCounter = 0; } // 每 60 秒调用一次录波文件检查 if (backupCounter >= 60) { check_and_backup_qvvr_files(); backupCounter = 0; } // 按天清理 —— 发现“日期变更”则执行一次清理 { const int todayYMD = local_ymd_today(); // YYYYMMDD(本地时区) if (todayYMD != s_lastCleanupYMD) { // 说明进入了新的一天:执行清理(删除前日及更早的未配对事件) safe_out_line("[OnTimerThread] daily cleanup start, today=" + std::to_string(todayYMD) + "\n"); cleanup_old_unpaired_qvvr_events(); // 调用清理内存的暂态事件 s_lastCleanupYMD = todayYMD; safe_out_line("[OnTimerThread] daily cleanup done\n"); } } hbCounter++; backupCounter++; g_ontime_blocked_times = 0; std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } catch (const std::exception& e) { safe_err_line(std::string("[OnTimerThread] Caught exception: ") + e.what()); } catch (...) { safe_err_line("[OnTimerThread] Caught unknown exception"); } { std::lock_guard lock(m_threadCheckMutex); m_needRestartTimerThread = true; } safe_out_line("[OnTimerThread] exited, will be restarted by monitor\n"); } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////消费者线程 void Front::mqconsumerThread() { try { std::string consumerGroup = subdir + std::to_string(g_front_seg_index); std::string nameServer = G_MQCONSUMER_IPPORT; std::vector subscriptions; //if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { subscriptions.emplace_back(G_MQCONSUMER_TOPIC_RT, FRONT_INST, myMessageCallbackrtdata); //} subscriptions.emplace_back(G_MQCONSUMER_TOPIC_UD, FRONT_INST, myMessageCallbackupdate); //if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { subscriptions.emplace_back(G_MQCONSUMER_TOPIC_RC, FRONT_INST, myMessageCallbackrecall); //} subscriptions.emplace_back(G_MQCONSUMER_TOPIC_SET, FRONT_INST, myMessageCallbackset); subscriptions.emplace_back(G_MQCONSUMER_TOPIC_LOG, FRONT_INST, myMessageCallbacklog); m_mqConsumer = make_unique(consumerGroup); // ✅ 必须在 start() 之前设置 m_mqConsumer->setLogPath("/data/logs/rocketmq"); // 目录 m_mqConsumer->setLogLevel(rocketmq::eLOG_LEVEL_ERROR); // 级别(error) m_mqConsumer->setLogFileSizeAndNum(5, 50); // 5 个文件,每个 50MB m_mqConsumer->setNamesrvAddr(nameServer); m_mqConsumer->setSessionCredentials(G_MQCONSUMER_ACCESSKEY, G_MQCONSUMER_SECRETKEY, G_MQCONSUMER_CHANNEL); m_mqConsumer->setInstanceName("inst_" + std::to_string(sGetMsTime())); m_mqConsumer->setConsumeFromWhere(rocketmq::CONSUME_FROM_LAST_OFFSET); std::map callbackMap; for (const auto& sub : subscriptions) { std::string key = sub.topic + ":" + sub.tag; callbackMap.emplace(key, sub.callback); m_mqConsumer->subscribe(sub.topic, sub.tag); safe_out_line("[mqconsumerThread] 已订阅 Topic=\"" + sub.topic + "\", Tag=\"" + sub.tag + "\"\n"); } m_listener = std::make_shared(callbackMap); m_mqConsumer->registerMessageListener(m_listener.get()); m_mqConsumer->start(); safe_out_line("[mqconsumerThread] Consumer 已启动,等待消息...\n"); // ✳️ 保持线程不主动退出,由 RocketMQ 内部驱动执行回调 // 如果 RocketMQ 内部机制失败或意外退出线程,就走 catch } catch (const rocketmq::MQClientException& e) { safe_err_line(std::string("[mqconsumerThread] MQClientException: ") + e.what()); std::lock_guard lock(m_threadCheckMutex); m_needRestartConsumerThread = true; return; } catch (const std::exception& e) { safe_err_line(std::string("[mqconsumerThread] std::exception: ") + e.what()); std::lock_guard lock(m_threadCheckMutex); m_needRestartConsumerThread = true; return; } catch (...) { safe_err_line("[mqconsumerThread] Unknown exception"); std::lock_guard lock(m_threadCheckMutex); m_needRestartConsumerThread = true; return; } // 程序运行中,消费者会通过回调处理消息,线程保持存活即可 safe_out_line("[mqconsumerThread] Consumer 线程正在运行,等待消息到达..."); } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////生产者线程 void Front::mqproducerThread() { try { // 1. 初始化生产者 InitializeProducer(m_producer); safe_out_line("\n[mqproducerThread] is running ...... \n\n"); uint32_t count = 0; while (!m_IsMQProducerCancel) { queue_data_t data; bool data_gotten = false; if(INITFLAG) { std::lock_guard lock(queue_data_list_mutex); if (!queue_data_list.empty()) { data = queue_data_list.front(); queue_data_list.pop_front(); data_gotten = true; } } if (data_gotten) { { auto now = std::chrono::system_clock::now(); auto ms_part = std::chrono::duration_cast( now.time_since_epoch()) % 1000; auto time_t_part = std::chrono::system_clock::to_time_t(now); std::tm tm_buf; localtime_r(&time_t_part, &tm_buf); char timeStr[32]; std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf); std::ostringstream oss; oss << "BEGIN my_queue_send no." << count << " >>>> " << timeStr << "." << std::setw(3) << std::setfill('0') << ms_part.count(); // 线程安全输出 safe_out_line(oss.str()); } // 调用实际发送 my_rocketmq_send(data, m_producer); { auto now = std::chrono::system_clock::now(); auto ms_part = std::chrono::duration_cast( now.time_since_epoch()) % 1000; auto time_t_part = std::chrono::system_clock::to_time_t(now); std::tm tm_buf; localtime_r(&time_t_part, &tm_buf); char timeStr[32]; std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf); std::ostringstream oss; oss << "END my_queue_send no." << count << " <<<< " << timeStr << "." << std::setw(3) << std::setfill('0') << ms_part.count(); safe_out_line(oss.str()); } } g_mqproducer_blocked_times = 0; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } safe_out_line("[mqproducerThread] 正常退出\n"); } catch (const std::exception& e) { safe_err_line(std::string("[mqproducerThread] std::exception: ") + e.what()); std::lock_guard lock(m_threadCheckMutex); m_needRestartProducerThread = true; } catch (...) { safe_err_line("[mqproducerThread] unknown exception\n"); std::lock_guard lock(m_threadCheckMutex); m_needRestartProducerThread = true; } } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////用例,除通讯外其他功能都可实现 //int main(int argc char** argv) //变为线程 extern thread_info_t thread_info[THREAD_CONNECTIONS]; /*void cleanup_args(ThreadArgs* args) { for (int i = 0; i < args->argc; ++i) { free(args->argv[i]); // strdup 分配的 } delete[] args->argv; delete args; }*/ void* cloudfrontthread(void* arg) { //不再需要入参20251208 /*ThreadArgs* args = static_cast(arg); int argc = args->argc; char **argv = args->argv; printf("[cloudfrontthread] argc = %d\n", argc); for (int i = 0; i < argc; ++i) { printf(" argv[%d] = %s\n", i, argv[i]); } // 动态解析线程 index int index = 0; if (argc > 0 && argv[0]) { try { index = std::stoi(argv[0]); } catch (...) { std::cerr << "[cloudfrontthread] Failed to parse index from argv[0]: " << argv[0] << "\n"; return nullptr; } }*/ (void)arg; const int index = 0; // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); safe_out_line(std::string("cloudfrontthread") + std::to_string(index) + " started\n"); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); /*// 解析命令行参数 if(!parse_param(argc,argv)){ std::cerr << "process param error,exit" << std::endl; cleanup_args(args); return nullptr; } // 线程使用完后清理参数 cleanup_args(args);*/ //路径获取 FRONT_PATH = get_parent_directory(); safe_out_line("FRONT_PATH:" + FRONT_PATH + "\n"); //声明前置 std::unique_ptr FrontProcess; FrontProcess = make_unique(); safe_out_line("[Main] Program running in background.\n"); // 5) 主线程保持后台运行 while(running) { { std::lock_guard lock(FrontProcess->m_threadCheckMutex); /*if (FrontProcess->m_needRestartFrontThread) { std::cout << "[Monitor] Restarting FrontThread..." << std::endl; FrontProcess->StartFrontThread(); FrontProcess->m_needRestartFrontThread = false; } if (FrontProcess->m_needRestartConsumerThread) { std::cout << "[Monitor] Restarting MQConsumerThread..." << std::endl; FrontProcess->StartMQConsumerThread(); FrontProcess->m_needRestartConsumerThread = false; } if (FrontProcess->m_needRestartProducerThread) { std::cout << "[Monitor] Restarting MQProducerThread..." << std::endl; FrontProcess->StartMQProducerThread(); FrontProcess->m_needRestartProducerThread = false; } if (FrontProcess->m_needRestartTimerThread) { std::cout << "[Monitor] Restarting TimerThread..." << std::endl; FrontProcess->StartTimerThread(); FrontProcess->m_needRestartTimerThread = false; }*/ if (FrontProcess->m_needRestartFrontThread) { safe_out_line("[Monitor] Restarting FrontThread...\n"); FrontProcess->StopFrontThread(); FrontProcess->StartFrontThread(); FrontProcess->m_needRestartFrontThread = false; } if (FrontProcess->m_needRestartConsumerThread) { safe_out_line("[Monitor] Restarting MQConsumerThread...\n"); FrontProcess->StopMQConsumerThread(); FrontProcess->StartMQConsumerThread(); FrontProcess->m_needRestartConsumerThread = false; } if (FrontProcess->m_needRestartProducerThread) { safe_out_line("[Monitor] Restarting MQProducerThread...\n"); FrontProcess->StopMQProducerThread(); FrontProcess->StartMQProducerThread(); FrontProcess->m_needRestartProducerThread = false; } if (FrontProcess->m_needRestartTimerThread) { safe_out_line("[Monitor] Restarting TimerThread...\n"); FrontProcess->StopTimerThread(); // 先停 FrontProcess->StartTimerThread(); // 再启 FrontProcess->m_needRestartTimerThread = false; } } std::this_thread::sleep_for(std::chrono::seconds(60));//每分钟检测一次 } // 退出前标记为 STOPPED,方便监控线程判断并重启 pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; safe_out_line(std::string("cloudfrontthread ") + std::to_string(index) + " stopped\n"); pthread_mutex_unlock(&thread_info[index].lock); return nullptr; }