Files
front_linux/LFtid1056/cloudfront/code/front.h
2025-06-25 10:14:22 +08:00

122 lines
3.5 KiB
C++

#ifndef FRONT_H
#define FRONT_H
#include <memory>
#include <thread>
#include <atomic>
#include <iostream>
#include <queue> //任务队列
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "rocketmq.h" // mq
#include "worker.h"
////////////////////////////////////////////////////////////////////////////////////////////////////////////////命名空间
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////线程池类
class ThreadPool {
public:
explicit ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this]() {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this]() { return stop || !tasks.empty(); });
if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
// 添加任务
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace(std::function<void()>(std::forward<F>(f)));
}
condition.notify_one();
}
// 析构:停止所有线程
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
}
private:
std::vector<std::thread> workers; // 工作线程
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queueMutex; // 队列锁
std::condition_variable condition; // 条件变量
bool stop; // 停止标志
};
/////////////////////////////////////////////////////////////////////////////////////////front类
class Front {
public:
Worker m_worker;
std::unique_ptr<rocketmq::DefaultMQPushConsumer> m_mqConsumer;
std::shared_ptr<rocketmq::SubscriberListener> m_listener;
rocketmq::RocketMQProducer* m_producer = nullptr;
std::thread m_FrontThread;
std::thread m_MQConsumerThread;
std::thread m_MQProducerThread;
std::thread m_TimerThread;
std::atomic<bool> m_IsMQProducerCancel{false};
std::atomic<bool> m_IsTimerCancel{false};
std::atomic<bool> m_bIsFrontThreadCancle{false};
std::atomic<bool> m_IsMQConsumerCancel{false};
std::mutex m_threadCheckMutex;
std::atomic<bool> m_needRestartFrontThread{false};
std::atomic<bool> m_needRestartConsumerThread{false};
std::atomic<bool> m_needRestartProducerThread{false};
std::atomic<bool> m_needRestartTimerThread{false};
Front();
~Front();
void FormClosing();
void StartFrontThread();
void StartMQConsumerThread();
void StartMQProducerThread();
void StartTimerThread();
void FrontThread();
void mqconsumerThread();
void mqproducerThread();
void OnTimerThread();
private:
ThreadPool m_threadPool;
};
#endif // FRONT_H