#ifndef FRONT_H #define FRONT_H #include #include #include #include #include //任务队列 /////////////////////////////////////////////////////////////////////////////////////////////////////////////// #include "rocketmq.h" // mq #include "worker.h" ////////////////////////////////////////////////////////////////////////////////////////////////////////////////命名空间 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////线程池类 class ThreadPool { public: explicit ThreadPool(size_t numThreads) : stop(false) { for (size_t i = 0; i < numThreads; ++i) { workers.emplace_back([this]() { for (;;) { std::function task; { std::unique_lock lock(queueMutex); condition.wait(lock, [this]() { return stop || !tasks.empty(); }); if (stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); } task(); } }); } } // 添加任务 template void enqueue(F&& f) { { std::unique_lock lock(queueMutex); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace(std::function(std::forward(f))); } condition.notify_one(); } // 析构:停止所有线程 ~ThreadPool() { { std::unique_lock lock(queueMutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join(); } private: std::vector workers; // 工作线程 std::queue> tasks; // 任务队列 std::mutex queueMutex; // 队列锁 std::condition_variable condition; // 条件变量 bool stop; // 停止标志 }; /////////////////////////////////////////////////////////////////////////////////////////front类 class Front { public: Worker m_worker; std::unique_ptr m_mqConsumer; std::shared_ptr m_listener; rocketmq::RocketMQProducer* m_producer = nullptr; std::thread m_FrontThread; std::thread m_MQConsumerThread; std::thread m_MQProducerThread; std::thread m_TimerThread; std::atomic m_IsMQProducerCancel{false}; std::atomic m_IsTimerCancel{false}; std::atomic m_bIsFrontThreadCancle{false}; std::atomic m_IsMQConsumerCancel{false}; std::atomic m_frontRunning{false}; std::atomic m_consumerRunning{false}; std::atomic m_producerRunning{false}; std::atomic m_timerRunning{false}; std::mutex m_threadCheckMutex; std::atomic m_needRestartFrontThread{false}; std::atomic m_needRestartConsumerThread{false}; std::atomic m_needRestartProducerThread{false}; std::atomic m_needRestartTimerThread{false}; Front(); ~Front(); void FormClosing(); void StartFrontThread(); void StartMQConsumerThread(); void StartMQProducerThread(); void StartTimerThread(); // [ADD] 统一的停止接口(便于重启前先停干净) void StopFrontThread(); void StopMQConsumerThread(); void StopMQProducerThread(); void StopTimerThread(); void FrontThread(); void mqconsumerThread(); void mqproducerThread(); void OnTimerThread(); private: ThreadPool m_threadPool; }; #endif // FRONT_H