@@ -37,7 +37,27 @@
# include "rocketmq/MQClientException.h"
# include "front.h"
//////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////////命名空间
//lnk 20251211
namespace {
std : : mutex g_streamMutex ;
void safe_out_str ( const std : : string & s ) {
std : : lock_guard < std : : mutex > lk ( g_streamMutex ) ;
std : : cout < < s < < std : : flush ;
}
void safe_out_line ( const std : : string & s ) {
std : : lock_guard < std : : mutex > lk ( g_streamMutex ) ;
std : : cout < < s < < std : : endl ;
}
void safe_err_line ( const std : : string & s ) {
std : : lock_guard < std : : mutex > lk ( g_streamMutex ) ;
std : : cerr < < s < < std : : endl ;
}
}
using json = nlohmann : : json ;
@@ -49,7 +69,7 @@ std::string FRONT_PATH;
//初始化标志
int INITFLAG = 0 ;
std : : atomic < int > INITFLAG { 0 } ;
//前置标置
std : : string subdir = " cloudfrontproc " ; //子目录
@@ -434,7 +454,7 @@ std::string get_parent_directory() {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////主功能线程
void Front : : FrontThread ( ) {
std : : cout < < " FrontThread::run() is called ...... \n " ;
safe_out_line ( " FrontThread::run() is called ...... \n " ) ;
try {
while ( ! m_bIsFrontThreadCancle ) {
@@ -445,9 +465,9 @@ void Front::FrontThread() {
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
}
} catch ( const std : : exception & e ) {
std : : cerr < < " [FrontThread] Caught exception: " < < e . what ( ) < < std : : endl ;
safe_err_line ( std : : string ( " [FrontThread] Caught exception: " ) + e . what ( ) ) ;
} catch ( . . . ) {
std : : cerr < < " [FrontThread] Caught unknown exception " < < std : : endl ;
safe_err_line ( " [FrontThread] Caught unknown exception " ) ;
}
// 设置重启标志
@@ -456,7 +476,7 @@ void Front::FrontThread() {
m_needRestartFrontThread = true ;
}
std : : cout < < " [FrontThread] exited, will be restarted by monitor \n " ;
safe_out_line ( " [FrontThread] exited, will be restarted by monitor \n " ) ;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////定时任务
@@ -465,7 +485,7 @@ void Front::OnTimerThread()
{
try {
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
std : : cout < < " OnTimerThread::run() is called ...... \n " ;
safe_out_line ( " OnTimerThread::run() is called ...... \n " ) ;
int hbCounter = 0 ; // 心跳计数
int backupCounter = 0 ; // 备份计数(分钟用)
@@ -499,10 +519,10 @@ void Front::OnTimerThread()
const int todayYMD = local_ymd_today ( ) ; // YYYYMMDD( 本地时区)
if ( todayYMD ! = s_lastCleanupYMD ) {
// 说明进入了新的一天:执行清理(删除前日及更早的未配对事件)
std : : cout < < " [OnTimerThread] daily cleanup start, today= " < < todayYMD < < std : : endl ;
safe_out_line ( " [OnTimerThread] daily cleanup start, today= " + std : : to_string ( todayYMD ) + " \n " ) ;
cleanup_old_unpaired_qvvr_events ( ) ; // 调用清理内存的暂态事件
s_lastCleanupYMD = todayYMD ;
std : : cout < < " [OnTimerThread] daily cleanup done " < < std : : endl ;
safe_out_line ( " [OnTimerThread] daily cleanup done \n " ) ;
}
}
@@ -513,9 +533,9 @@ void Front::OnTimerThread()
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
}
} catch ( const std : : exception & e ) {
std : : cerr < < " [OnTimerThread] Caught exception: " < < e . what ( ) < < std : : endl ;
safe_err_line ( std : : string ( " [OnTimerThread] Caught exception: " ) + e . what ( ) ) ;
} catch ( . . . ) {
std : : cerr < < " [OnTimerThread] Caught unknown exception " < < std : : endl ;
safe_err_line ( " [OnTimerThread] Caught unknown exception " ) ;
}
{
@@ -523,7 +543,7 @@ void Front::OnTimerThread()
m_needRestartTimerThread = true ;
}
std : : cout < < " [OnTimerThread] exited, will be restarted by monitor \n " ;
safe_out_line ( " [OnTimerThread] exited, will be restarted by monitor \n " ) ;
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////消费者线程
@@ -556,37 +576,37 @@ void Front::mqconsumerThread()
std : : string key = sub . topic + " : " + sub . tag ;
callbackMap . emplace ( key , sub . callback ) ;
m_mqConsumer - > subscribe ( sub . topic , sub . tag ) ;
std : : cout < < " [mqconsumerThread] 已订阅 Topic= \" " < < sub . topic < < " \" , Tag= \" " < < sub . tag < < " \" " < < std : : endl ;
safe_out_line ( " [mqconsumerThread] 已订阅 Topic= \" " + sub . topic + " \" , Tag= \" " + sub . tag + " \" \n " ) ;
}
m_listener = std : : make_shared < rocketmq : : SubscriberListener > ( callbackMap ) ;
m_mqConsumer - > registerMessageListener ( m_listener . get ( ) ) ;
m_mqConsumer - > start ( ) ;
std : : cout < < " [mqconsumerThread] Consumer 已启动,等待消息... " < < std : : endl ;
safe_out_line ( " [mqconsumerThread] Consumer 已启动,等待消息... \n " ) ;
// ✳️ 保持线程不主动退出,由 RocketMQ 内部驱动执行回调
// 如果 RocketMQ 内部机制失败或意外退出线程,就走 catch
}
catch ( const rocketmq : : MQClientException & e ) {
std : : cerr < < " [mqconsumerThread] MQClientException: " < < e . what ( ) < < std : : endl ;
safe_err_line ( std : : string ( " [mqconsumerThread] MQClientException: " ) + e . what ( ) ) ;
std : : lock_guard < std : : mutex > lock ( m_threadCheckMutex ) ;
m_needRestartConsumerThread = true ;
return ;
} catch ( const std : : exception & e ) {
std : : cerr < < " [mqconsumerThread] std::exception: " < < e . what ( ) < < std : : endl ;
safe_err_line ( std : : string ( " [mqconsumerThread] std::exception: " ) + e . what ( ) ) ;
std : : lock_guard < std : : mutex > lock ( m_threadCheckMutex ) ;
m_needRestartConsumerThread = true ;
return ;
} catch ( . . . ) {
std : : cerr < < " [mqconsumerThread] Unknown exception " < < std : : endl ;
safe_err_line ( " [mqconsumerThread] Unknown exception " ) ;
std : : lock_guard < std : : mutex > lock ( m_threadCheckMutex ) ;
m_needRestartConsumerThread = true ;
return ;
}
// 程序运行中,消费者会通过回调处理消息,线程保持存活即可
std : : cout < < " [mqconsumerThread] Consumer 线程正在运行,等待消息到达... " < < std : : endl ;
safe_out_line ( " [mqconsumerThread] Consumer 线程正在运行,等待消息到达... " ) ;
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////生产者线程
@@ -596,7 +616,7 @@ void Front::mqproducerThread()
try {
// 1. 初始化生产者
InitializeProducer ( m_producer ) ;
std : : cout < < " \n [mqproducerThread] is running ...... \n \n " ;
safe_out_line ( " \n [mqproducerThread] is running ...... \n \n " ) ;
uint32_t count = 0 ;
@@ -615,49 +635,60 @@ void Front::mqproducerThread()
}
if ( data_gotten ) {
auto now = std : : chrono : : system_clock : : now ( ) ;
auto ms_part = std : : chrono : : duration_cast < std : : chrono : : milliseconds > (
now . time_since_epoch ( ) ) % 1000 ;
auto time_t_part = std : : chrono : : system_clock : : to_time_t ( now ) ;
std : : tm tm_buf ;
localtime_r ( & time_t_part , & tm_buf ) ;
char timeStr [ 32 ] ;
std : : strftime ( timeStr , sizeof ( timeStr ) , " %Y-%m-%d %H:%M:%S " , & tm_buf ) ;
{
auto now = std : : chrono : : system_clock : : now ( ) ;
auto ms_part = std : : chrono : : duration_cast < std : : chrono : : milliseconds > (
now . time_since_epoch ( ) ) % 1000 ;
auto time_t_part = std : : chrono : : system_clock : : to_time_t ( now ) ;
std : : tm tm_buf ;
localtime_r ( & time_t_part , & tm_buf ) ;
char timeStr [ 32 ] ;
std : : strftime ( timeStr , sizeof ( timeStr ) , " %Y-%m-%d %H:%M:%S " , & tm_buf ) ;
std : : cout < < " BEGIN my_queue_send no. " < < count
< < " >>>> " < < timeStr
< < " . " < < std : : setw ( 3 ) < < std : : setfill ( ' 0 ' ) < < ms_part . count ( )
< < std : : endl ;
std : : ostringstream oss ;
oss < < " BEGIN my_queue_send no. " < < count
< < " >>>> " < < timeStr
< < " . " < < std : : setw ( 3 ) < < std : : setfill ( ' 0 ' ) < < ms_part . count ( ) ;
// 线程安全输出
safe_out_line ( oss . str ( ) ) ;
}
// 调用实际发送
my_rocketmq_send ( data , m_producer ) ;
now = std : : chrono : : system_clock : : now ( ) ;
ms_part = std : : chrono : : duration_cast < std : : chrono : : milliseconds > (
now . time_since_epoch ( ) ) % 1000 ;
time_t_part = std : : chrono : : system_clock : : to_time_t ( now ) ;
localtime_r ( & time_t_part , & tm_buf ) ;
std : : strftime ( timeStr , sizeof ( timeStr ) , " %Y-%m-%d %H:%M:%S " , & tm_buf ) ;
{
auto now = std : : chrono : : system_clock : : now ( ) ;
auto ms_part = std : : chrono : : duration_cast < std : : chrono : : milliseconds > (
now . time_since_epoch ( ) ) % 1000 ;
auto time_t_part = std : : chrono : : system_clock : : to_time_t ( now ) ;
std : : tm tm_buf ;
localtime_r ( & time_t_part , & tm_buf ) ;
char timeStr [ 32 ] ;
std : : strftime ( timeStr , sizeof ( timeStr ) , " %Y-%m-%d %H:%M:%S " , & tm_buf ) ;
std : : cout < < " END my_queue_send no. " < < count + +
< < " >>>> " < < timeStr
< < " . " < < std : : setw ( 3 ) < < std : : setfill ( ' 0 ' ) < < ms_part . count ( )
< < " \n \n " ;
std : : ostringstream oss ;
oss < < " END my_queue_send no. " < < count
< < " <<<< " < < timeStr
< < " . " < < std : : setw ( 3 ) < < std : : setfill ( ' 0 ' ) < < ms_part . count ( ) ;
safe_out_line ( oss . str ( ) ) ;
}
}
g_mqproducer_blocked_times = 0 ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
}
std : : cout < < " [mqproducerThread] 正常退出 \n " ;
safe_out_line ( " [mqproducerThread] 正常退出 \n " ) ;
}
catch ( const std : : exception & e ) {
std : : cerr < < " [mqproducerThread] std::exception: " < < e . what ( ) < < std : : endl ;
safe_err_line ( std : : string ( " [mqproducerThread] std::exception: " ) + e . what ( ) ) ;
std : : lock_guard < std : : mutex > lock ( m_threadCheckMutex ) ;
m_needRestartProducerThread = true ;
}
catch ( . . . ) {
std : : cerr < < " [mqproducerThread] unknown exception \n " ;
safe_err_line ( " [mqproducerThread] unknown exception \n " ) ;
std : : lock_guard < std : : mutex > lock ( m_threadCheckMutex ) ;
m_needRestartProducerThread = true ;
}
@@ -704,7 +735,7 @@ void* cloudfrontthread(void* arg) {
// 更新线程状态为运行中
pthread_mutex_lock ( & thread_info [ index ] . lock ) ;
printf ( " cloudfrontthread %d started\n " , index );
safe_out_line ( std : : string ( " cloudfrontthread " ) + std : : to_string ( index ) + " started\n " ) ;
thread_info [ index ] . state = THREAD_RUNNING ;
pthread_mutex_unlock ( & thread_info [ index ] . lock ) ;
@@ -720,13 +751,13 @@ void* cloudfrontthread(void* arg) {
//路径获取
FRONT_PATH = get_parent_directory ( ) ;
std : : cout < < " FRONT_PATH: " < < FRONT_PATH < < std : : endl ;
safe_out_line ( " FRONT_PATH: " + FRONT_PATH + " \n " ) ;
//声明前置
std : : unique_ptr < Front > FrontProcess ;
FrontProcess = make_unique < Front > ( ) ;
std : : cout < < " [Main] Program running in background. \n " ;
safe_out_line ( " [Main] Program running in background. \n " ) ;
// 5) 主线程保持后台运行
while ( running ) {
@@ -758,28 +789,28 @@ void* cloudfrontthread(void* arg) {
}*/
if ( FrontProcess - > m_needRestartFrontThread ) {
std : : cout < < " [Monitor] Restarting FrontThread... " < < std : : endl ;
safe_out_line ( " [Monitor] Restarting FrontThread... \n " ) ;
FrontProcess - > StopFrontThread ( ) ;
FrontProcess - > StartFrontThread ( ) ;
FrontProcess - > m_needRestartFrontThread = false ;
}
if ( FrontProcess - > m_needRestartConsumerThread ) {
std : : cout < < " [Monitor] Restarting MQConsumerThread... " < < std : : endl ;
safe_out_line ( " [Monitor] Restarting MQConsumerThread... \n " ) ;
FrontProcess - > StopMQConsumerThread ( ) ;
FrontProcess - > StartMQConsumerThread ( ) ;
FrontProcess - > m_needRestartConsumerThread = false ;
}
if ( FrontProcess - > m_needRestartProducerThread ) {
std : : cout < < " [Monitor] Restarting MQProducerThread... " < < std : : endl ;
safe_out_line ( " [Monitor] Restarting MQProducerThread... \n " ) ;
FrontProcess - > StopMQProducerThread ( ) ;
FrontProcess - > StartMQProducerThread ( ) ;
FrontProcess - > m_needRestartProducerThread = false ;
}
if ( FrontProcess - > m_needRestartTimerThread ) {
std : : cout < < " [Monitor] Restarting TimerThread... " < < std : : endl ;
safe_out_line ( " [Monitor] Restarting TimerThread... \n " ) ;
FrontProcess - > StopTimerThread ( ) ; // 先停
FrontProcess - > StartTimerThread ( ) ; // 再启
FrontProcess - > m_needRestartTimerThread = false ;
@@ -792,7 +823,7 @@ void* cloudfrontthread(void* arg) {
// 退出前标记为 STOPPED, 方便监控线程判断并重启
pthread_mutex_lock ( & thread_info [ index ] . lock ) ;
thread_info [ index ] . state = THREAD_STOPPED ;
printf ( " cloudfrontthread %d stopped\n " , index );
safe_out_line ( std : : string ( " cloudfrontthread " ) + std : : to_string ( index ) + " stopped\n " ) ;
pthread_mutex_unlock ( & thread_info [ index ] . lock ) ;
return nullptr ;