fix recall

This commit is contained in:
lnk
2025-10-24 17:07:51 +08:00
parent 07ac84d612
commit f69a6d2105
5 changed files with 323 additions and 79 deletions

View File

@@ -246,7 +246,7 @@ std::string get_parent_directory() {
}
// ============ 关闭所有运行中的线程============
void Front::FormClosing() {
/*void Front::FormClosing() {
//确保testshell关闭
m_worker.stopServer();
@@ -286,11 +286,19 @@ std::string get_parent_directory() {
m_MQConsumerThread.join();
}
}*/
void Front::FormClosing() {
m_worker.stopServer();
StopFrontThread();
StopTimerThread();
StopMQProducerThread();
StopMQConsumerThread();
}
//============ 线程函数 ============
void Front::StartFrontThread() {
/*void Front::StartFrontThread() {
m_bIsFrontThreadCancle = false;
m_FrontThread = std::thread(&Front::FrontThread, this);
}
@@ -308,7 +316,137 @@ std::string get_parent_directory() {
void Front::StartTimerThread() {
m_IsTimerCancel = false;
m_TimerThread = std::thread(&Front::OnTimerThread, this);
}
} */
void Front::StartFrontThread() {
bool expected = false;
if (!m_frontRunning.compare_exchange_strong(expected, true)) {
std::cout << "[FrontThread] already running, skip\n";
return;
}
if (m_FrontThread.joinable()) m_FrontThread.join();
m_bIsFrontThreadCancle = false;
m_FrontThread = std::thread([this]{
try {
this->FrontThread();
} catch (const std::exception& e) {
std::cerr << "[FrontThread] exception: " << e.what() << "\n";
} catch (...) {
std::cerr << "[FrontThread] unknown exception\n";
}
m_frontRunning = false; // 线程真正退出后复位
});
}
void Front::StartMQConsumerThread() {
bool expected = false;
if (!m_consumerRunning.compare_exchange_strong(expected, true)) {
std::cout << "[MQConsumer] already running, skip\n";
return;
}
if (m_MQConsumerThread.joinable()) m_MQConsumerThread.join();
m_IsMQConsumerCancel = false;
m_MQConsumerThread = std::thread([this]{
try {
this->mqconsumerThread();
} catch (const std::exception& e) {
std::cerr << "[mqconsumerThread] exception: " << e.what() << "\n";
} catch (...) {
std::cerr << "[mqconsumerThread] unknown exception\n";
}
m_consumerRunning = false;
});
}
void Front::StartMQProducerThread() {
bool expected = false;
if (!m_producerRunning.compare_exchange_strong(expected, true)) {
std::cout << "[MQProducer] already running, skip\n";
return;
}
if (m_MQProducerThread.joinable()) m_MQProducerThread.join();
m_IsMQProducerCancel = false;
m_MQProducerThread = std::thread([this]{
try {
this->mqproducerThread();
} catch (const std::exception& e) {
std::cerr << "[mqproducerThread] exception: " << e.what() << "\n";
} catch (...) {
std::cerr << "[mqproducerThread] unknown exception\n";
}
m_producerRunning = false;
});
}
void Front::StartTimerThread() {
bool expected = false;
if (!m_timerRunning.compare_exchange_strong(expected, true)) {
std::cout << "[Timer] already running, skip StartTimerThread\n";
return; // 已有定时线程在跑,直接跳过
}
// 若有旧线程尚未 join先回收
if (m_TimerThread.joinable()) {
m_TimerThread.join();
}
m_IsTimerCancel.store(false, std::memory_order_relaxed);
m_TimerThread = std::thread([this]{
try {
this->OnTimerThread();
} catch (const std::exception& e) {
std::cerr << "[Timer] exception: " << e.what() << "\n";
} catch (...) {
std::cerr << "[Timer] unknown exception\n";
}
m_timerRunning.store(false); // 线程真正退出后复位
});
}
void Front::StopFrontThread() {
if (!m_frontRunning.load()) return;
m_bIsFrontThreadCancle = true;
if (m_FrontThread.joinable()) m_FrontThread.join();
m_frontRunning = false;
}
void Front::StopMQConsumerThread() {
if (!m_consumerRunning.load()) return;
m_IsMQConsumerCancel = true; // 你的线程函数可能不轮询此标志,但先置上
// 关闭 MQ 对象(避免内部阻塞线程仍在)
if (m_mqConsumer) {
try { m_mqConsumer->shutdown(); } catch (...) {}
m_mqConsumer.reset();
}
m_listener.reset();
if (m_MQConsumerThread.joinable()) m_MQConsumerThread.join();
m_consumerRunning = false;
}
void Front::StopMQProducerThread() {
if (!m_producerRunning.load()) return;
m_IsMQProducerCancel = true;
if (m_MQProducerThread.joinable()) m_MQProducerThread.join();
m_producerRunning = false;
// 如需销毁/关闭 producer对应你的初始化方式
// if (m_producer) { ShutdownProducer(m_producer); m_producer = nullptr; }
}
void Front::StopTimerThread() {
if (!m_timerRunning.load()) return; // 没跑就不处理
m_IsTimerCancel.store(true);
if (m_TimerThread.joinable()) {
m_TimerThread.join(); // 等它退出
}
m_timerRunning.store(false);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////主功能线程
@@ -596,7 +734,7 @@ void* cloudfrontthread(void* arg) {
{
std::lock_guard<std::mutex> lock(FrontProcess->m_threadCheckMutex);
if (FrontProcess->m_needRestartFrontThread) {
/*if (FrontProcess->m_needRestartFrontThread) {
std::cout << "[Monitor] Restarting FrontThread..." << std::endl;
FrontProcess->StartFrontThread();
FrontProcess->m_needRestartFrontThread = false;
@@ -618,6 +756,34 @@ void* cloudfrontthread(void* arg) {
std::cout << "[Monitor] Restarting TimerThread..." << std::endl;
FrontProcess->StartTimerThread();
FrontProcess->m_needRestartTimerThread = false;
}*/
if (FrontProcess->m_needRestartFrontThread) {
std::cout << "[Monitor] Restarting FrontThread..." << std::endl;
FrontProcess->StopFrontThread();
FrontProcess->StartFrontThread();
FrontProcess->m_needRestartFrontThread = false;
}
if (FrontProcess->m_needRestartConsumerThread) {
std::cout << "[Monitor] Restarting MQConsumerThread..." << std::endl;
FrontProcess->StopMQConsumerThread();
FrontProcess->StartMQConsumerThread();
FrontProcess->m_needRestartConsumerThread = false;
}
if (FrontProcess->m_needRestartProducerThread) {
std::cout << "[Monitor] Restarting MQProducerThread..." << std::endl;
FrontProcess->StopMQProducerThread();
FrontProcess->StartMQProducerThread();
FrontProcess->m_needRestartProducerThread = false;
}
if (FrontProcess->m_needRestartTimerThread) {
std::cout << "[Monitor] Restarting TimerThread..." << std::endl;
FrontProcess->StopTimerThread(); // 先停
FrontProcess->StartTimerThread(); // 再启
FrontProcess->m_needRestartTimerThread = false;
}
}