From 2c48d7ae0a28e313b1ea55e0bc6cf0faf543e6c1 Mon Sep 17 00:00:00 2001 From: lnk Date: Thu, 11 Dec 2025 15:07:54 +0800 Subject: [PATCH] fix data race --- LFtid1056/cloudfront/code/cfg_parser.cpp | 3 - LFtid1056/cloudfront/code/main.cpp | 135 ++++++++++++++--------- LFtid1056/cloudfront/code/rocketmq.cpp | 2 +- LFtid1056/cloudfront/code/worker.cpp | 2 +- LFtid1056/main_thread.cpp | 2 +- 5 files changed, 86 insertions(+), 58 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 793054b..effbe3c 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -46,9 +46,6 @@ extern int g_front_seg_index; extern int g_front_seg_num; extern unsigned int g_node_id; //前置程序类型(100-600) -//初始化完成标识 -extern int INITFLAG; - //线程阻塞计数 extern uint32_t g_ontime_blocked_times; diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index 8c933a9..75f0ee8 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -37,7 +37,27 @@ #include "rocketmq/MQClientException.h" #include "front.h" -////////////////////////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////////////////////////////命名空间 +//lnk 20251211 +namespace { + std::mutex g_streamMutex; + + void safe_out_str(const std::string& s) { + std::lock_guard lk(g_streamMutex); + std::cout << s << std::flush; + } + + void safe_out_line(const std::string& s) { + std::lock_guard lk(g_streamMutex); + std::cout << s << std::endl; + } + + void safe_err_line(const std::string& s) { + std::lock_guard lk(g_streamMutex); + std::cerr << s << std::endl; + } +} + using json = nlohmann::json; @@ -49,7 +69,7 @@ std::string FRONT_PATH; //初始化标志 -int INITFLAG = 0; +std::atomic INITFLAG{0}; //前置标置 std::string subdir = "cloudfrontproc"; //子目录 @@ -434,7 +454,7 @@ std::string get_parent_directory() { ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////主功能线程 void Front::FrontThread() { - std::cout << "FrontThread::run() is called ...... \n"; + safe_out_line("FrontThread::run() is called ...... \n"); try { while (!m_bIsFrontThreadCancle) { @@ -445,9 +465,9 @@ void Front::FrontThread() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } catch (const std::exception& e) { - std::cerr << "[FrontThread] Caught exception: " << e.what() << std::endl; + safe_err_line(std::string("[FrontThread] Caught exception: ") + e.what()); } catch (...) { - std::cerr << "[FrontThread] Caught unknown exception" << std::endl; + safe_err_line("[FrontThread] Caught unknown exception"); } // 设置重启标志 @@ -456,7 +476,7 @@ void Front::FrontThread() { m_needRestartFrontThread = true; } - std::cout << "[FrontThread] exited, will be restarted by monitor\n"; + safe_out_line("[FrontThread] exited, will be restarted by monitor\n"); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////定时任务 @@ -465,7 +485,7 @@ void Front::OnTimerThread() { try { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - std::cout << "OnTimerThread::run() is called ...... \n"; + safe_out_line("OnTimerThread::run() is called ...... \n"); int hbCounter = 0; // 心跳计数 int backupCounter = 0; // 备份计数(分钟用) @@ -499,10 +519,10 @@ void Front::OnTimerThread() const int todayYMD = local_ymd_today(); // YYYYMMDD(本地时区) if (todayYMD != s_lastCleanupYMD) { // 说明进入了新的一天:执行清理(删除前日及更早的未配对事件) - std::cout << "[OnTimerThread] daily cleanup start, today=" << todayYMD << std::endl; + safe_out_line("[OnTimerThread] daily cleanup start, today=" + std::to_string(todayYMD) + "\n"); cleanup_old_unpaired_qvvr_events(); // 调用清理内存的暂态事件 s_lastCleanupYMD = todayYMD; - std::cout << "[OnTimerThread] daily cleanup done" << std::endl; + safe_out_line("[OnTimerThread] daily cleanup done\n"); } } @@ -513,9 +533,9 @@ void Front::OnTimerThread() std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } catch (const std::exception& e) { - std::cerr << "[OnTimerThread] Caught exception: " << e.what() << std::endl; + safe_err_line(std::string("[OnTimerThread] Caught exception: ") + e.what()); } catch (...) { - std::cerr << "[OnTimerThread] Caught unknown exception" << std::endl; + safe_err_line("[OnTimerThread] Caught unknown exception"); } { @@ -523,7 +543,7 @@ void Front::OnTimerThread() m_needRestartTimerThread = true; } - std::cout << "[OnTimerThread] exited, will be restarted by monitor\n"; + safe_out_line("[OnTimerThread] exited, will be restarted by monitor\n"); } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////消费者线程 @@ -556,37 +576,37 @@ void Front::mqconsumerThread() std::string key = sub.topic + ":" + sub.tag; callbackMap.emplace(key, sub.callback); m_mqConsumer->subscribe(sub.topic, sub.tag); - std::cout << "[mqconsumerThread] 已订阅 Topic=\"" << sub.topic << "\", Tag=\"" << sub.tag << "\"" << std::endl; + safe_out_line("[mqconsumerThread] 已订阅 Topic=\"" + sub.topic + "\", Tag=\"" + sub.tag + "\"\n"); } m_listener = std::make_shared(callbackMap); m_mqConsumer->registerMessageListener(m_listener.get()); m_mqConsumer->start(); - std::cout << "[mqconsumerThread] Consumer 已启动,等待消息..." << std::endl; + safe_out_line("[mqconsumerThread] Consumer 已启动,等待消息...\n"); // ✳️ 保持线程不主动退出,由 RocketMQ 内部驱动执行回调 // 如果 RocketMQ 内部机制失败或意外退出线程,就走 catch } catch (const rocketmq::MQClientException& e) { - std::cerr << "[mqconsumerThread] MQClientException: " << e.what() << std::endl; + safe_err_line(std::string("[mqconsumerThread] MQClientException: ") + e.what()); std::lock_guard lock(m_threadCheckMutex); m_needRestartConsumerThread = true; return; } catch (const std::exception& e) { - std::cerr << "[mqconsumerThread] std::exception: " << e.what() << std::endl; + safe_err_line(std::string("[mqconsumerThread] std::exception: ") + e.what()); std::lock_guard lock(m_threadCheckMutex); m_needRestartConsumerThread = true; return; } catch (...) { - std::cerr << "[mqconsumerThread] Unknown exception" << std::endl; + safe_err_line("[mqconsumerThread] Unknown exception"); std::lock_guard lock(m_threadCheckMutex); m_needRestartConsumerThread = true; return; } // 程序运行中,消费者会通过回调处理消息,线程保持存活即可 - std::cout << "[mqconsumerThread] Consumer 线程正在运行,等待消息到达..." << std::endl; + safe_out_line("[mqconsumerThread] Consumer 线程正在运行,等待消息到达..."); } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////生产者线程 @@ -596,7 +616,7 @@ void Front::mqproducerThread() try { // 1. 初始化生产者 InitializeProducer(m_producer); - std::cout << "\n[mqproducerThread] is running ...... \n\n"; + safe_out_line("\n[mqproducerThread] is running ...... \n\n"); uint32_t count = 0; @@ -615,49 +635,60 @@ void Front::mqproducerThread() } if (data_gotten) { - auto now = std::chrono::system_clock::now(); - auto ms_part = std::chrono::duration_cast( - now.time_since_epoch()) % 1000; - auto time_t_part = std::chrono::system_clock::to_time_t(now); - std::tm tm_buf; - localtime_r(&time_t_part, &tm_buf); - char timeStr[32]; - std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf); + { + auto now = std::chrono::system_clock::now(); + auto ms_part = std::chrono::duration_cast( + now.time_since_epoch()) % 1000; + auto time_t_part = std::chrono::system_clock::to_time_t(now); + std::tm tm_buf; + localtime_r(&time_t_part, &tm_buf); + char timeStr[32]; + std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf); - std::cout << "BEGIN my_queue_send no." << count - << " >>>> " << timeStr - << "." << std::setw(3) << std::setfill('0') << ms_part.count() - << std::endl; + std::ostringstream oss; + oss << "BEGIN my_queue_send no." << count + << " >>>> " << timeStr + << "." << std::setw(3) << std::setfill('0') << ms_part.count(); + + // 线程安全输出 + safe_out_line(oss.str()); + } // 调用实际发送 my_rocketmq_send(data, m_producer); - now = std::chrono::system_clock::now(); - ms_part = std::chrono::duration_cast( - now.time_since_epoch()) % 1000; - time_t_part = std::chrono::system_clock::to_time_t(now); - localtime_r(&time_t_part, &tm_buf); - std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf); + { + auto now = std::chrono::system_clock::now(); + auto ms_part = std::chrono::duration_cast( + now.time_since_epoch()) % 1000; + auto time_t_part = std::chrono::system_clock::to_time_t(now); + std::tm tm_buf; + localtime_r(&time_t_part, &tm_buf); + char timeStr[32]; + std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf); - std::cout << "END my_queue_send no." << count++ - << " >>>> " << timeStr - << "." << std::setw(3) << std::setfill('0') << ms_part.count() - << "\n\n"; + std::ostringstream oss; + oss << "END my_queue_send no." << count + << " <<<< " << timeStr + << "." << std::setw(3) << std::setfill('0') << ms_part.count(); + + safe_out_line(oss.str()); + } } g_mqproducer_blocked_times = 0; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - std::cout << "[mqproducerThread] 正常退出\n"; + safe_out_line("[mqproducerThread] 正常退出\n"); } catch (const std::exception& e) { - std::cerr << "[mqproducerThread] std::exception: " << e.what() << std::endl; + safe_err_line(std::string("[mqproducerThread] std::exception: ") + e.what()); std::lock_guard lock(m_threadCheckMutex); m_needRestartProducerThread = true; } catch (...) { - std::cerr << "[mqproducerThread] unknown exception\n"; + safe_err_line("[mqproducerThread] unknown exception\n"); std::lock_guard lock(m_threadCheckMutex); m_needRestartProducerThread = true; } @@ -704,7 +735,7 @@ void* cloudfrontthread(void* arg) { // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); - printf("cloudfrontthread %d started\n", index); + safe_out_line(std::string("cloudfrontthread") + std::to_string(index) + " started\n"); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); @@ -720,13 +751,13 @@ void* cloudfrontthread(void* arg) { //路径获取 FRONT_PATH = get_parent_directory(); - std::cout << "FRONT_PATH:" << FRONT_PATH << std::endl; + safe_out_line("FRONT_PATH:" + FRONT_PATH + "\n"); //声明前置 std::unique_ptr FrontProcess; FrontProcess = make_unique(); - std::cout << "[Main] Program running in background.\n"; + safe_out_line("[Main] Program running in background.\n"); // 5) 主线程保持后台运行 while(running) { @@ -758,28 +789,28 @@ void* cloudfrontthread(void* arg) { }*/ if (FrontProcess->m_needRestartFrontThread) { - std::cout << "[Monitor] Restarting FrontThread..." << std::endl; + safe_out_line("[Monitor] Restarting FrontThread...\n"); FrontProcess->StopFrontThread(); FrontProcess->StartFrontThread(); FrontProcess->m_needRestartFrontThread = false; } if (FrontProcess->m_needRestartConsumerThread) { - std::cout << "[Monitor] Restarting MQConsumerThread..." << std::endl; + safe_out_line("[Monitor] Restarting MQConsumerThread...\n"); FrontProcess->StopMQConsumerThread(); FrontProcess->StartMQConsumerThread(); FrontProcess->m_needRestartConsumerThread = false; } if (FrontProcess->m_needRestartProducerThread) { - std::cout << "[Monitor] Restarting MQProducerThread..." << std::endl; + safe_out_line("[Monitor] Restarting MQProducerThread...\n"); FrontProcess->StopMQProducerThread(); FrontProcess->StartMQProducerThread(); FrontProcess->m_needRestartProducerThread = false; } if (FrontProcess->m_needRestartTimerThread) { - std::cout << "[Monitor] Restarting TimerThread..." << std::endl; + safe_out_line("[Monitor] Restarting TimerThread...\n"); FrontProcess->StopTimerThread(); // 先停 FrontProcess->StartTimerThread(); // 再启 FrontProcess->m_needRestartTimerThread = false; @@ -792,7 +823,7 @@ void* cloudfrontthread(void* arg) { // 退出前标记为 STOPPED,方便监控线程判断并重启 pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; - printf("cloudfrontthread %d stopped\n", index); + safe_out_line(std::string("cloudfrontthread ") + std::to_string(index) + " stopped\n"); pthread_mutex_unlock(&thread_info[index].lock); return nullptr; diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 60841b0..b99e770 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -72,7 +72,7 @@ extern std::string FRONT_INST; extern uint32_t g_mqproducer_blocked_times; //初始化标志 -extern int INITFLAG; +extern std::atomic INITFLAG; //测试用的终端数组 extern std::vector TESTARRAY; diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 9842051..3fb8cbd 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -49,7 +49,7 @@ extern std::list errorList, warnList, normalList; extern std::mutex errorListMutex, warnListMutex, normalListMutex; extern int IED_COUNT; -extern int INITFLAG; +extern std::atomic INITFLAG; extern int g_front_seg_index; extern std::string subdir; diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 6dd647e..af43e85 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -34,7 +34,7 @@ typedef struct { } thread_info_t; #endif -extern int INITFLAG;//̨˵ȳʼɱ־ +extern std::atomic INITFLAG;//̨˵ȳʼɱ־ //extern void cleanup_args(ThreadArgs* args); void init_daemon(void)