Files
front_linux/LFtid1056/cloudfront/code/main.cpp

598 lines
20 KiB
C++
Raw Normal View History

2025-06-24 17:55:34 +08:00
/////////////////////////////////////////////////////////////////////////////////////////////////////
#include <memory>
#include <queue> //任务队列
#include <csignal> //信号处理
#include <iostream> //标准输入输出
#include <string> //字符串
#include <vector> //数组型容器
#include <map> //映射
#include <mutex> //锁
#include <thread> //线程
#include <condition_variable> //线程控制
#include <atomic> //原子操作
#include <chrono> //时间
#include <ctime> //时间
#include <sstream> //流处理
#include <cstdio> //字符处理
#include <iomanip> //格式控制
#include <functional> //类消费者绑定
#include <unistd.h> //获取目录
#include <array>
#include <list>
#include <cstdio>
#include <sys/stat.h>
#include <fnmatch.h>
#include <libgen.h>
#include <cstdlib>
////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "interface.h" //用于访问接口
#include "log4.h" //用于日志
#include "curl/curl.h" //用于访问接口
#include "nlohmann/json.hpp" //用于构造json
#include "worker.h" //shell接口
#include "rocketmq.h"
#include "rocketmq/MQClientException.h"
#include "front.h"
//////////////////////////////////////////////////////////////////////////////////////////////////////
using json = nlohmann::json;
//////////////////////////////////////////////////////////////////////////////////////////////////////全局变量
//前置程序路径
std::string FRONT_PATH;
//初始化标志
int INITFLAG = 0;
//前置标置
std::string subdir = "cfg_stat_data"; //默认稳态
uint32_t g_node_id = 0;
int g_front_seg_index = 0; //默认单进程
int g_front_seg_num = 0; //默认单进程
//实时进程标志
int three_secs_enabled = 0;
//稳态进程自动注册报告标志
int auto_register_report_enabled = 0;
//mq生产线程和定时线程都加上死锁计数器
uint32_t g_mqproducer_blocked_times = 0;
uint32_t g_ontime_blocked_times = 0;
//进程控制
std::atomic<bool> running{true};
void onSignal(int){ running = false; }
/////////////////////////////////////////////////////////////////////////////////////////////////////
extern int G_TEST_FLAG; //测试线程开启开关
extern int TEST_PORT; //测试端口号
extern std::string FRONT_INST;
extern std::mutex queue_data_list_mutex;
extern std::list<queue_data_t> queue_data_list;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
template<typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
//处理参数
bool parse_param(int argc, char* argv[]) {
for (int i = 1; i < argc; ++i) {
std::string arg = argv[i];
// 处理 -s 参数
if (arg == "-s" && i + 1 < argc) {
std::string val = argv[++i];
auto pos = val.find('_');
if (pos != std::string::npos) {
try {
g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1));
} catch (...) {
std::cerr << "Invalid -s format." << std::endl;
}
}
continue;
}
// 处理 -s1_5 这种紧凑写法
if (arg.rfind("-s", 0) == 0 && arg.length() > 2) {
std::string val = arg.substr(2);
auto pos = val.find('_');
if (pos != std::string::npos) {
try {
g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1));
} catch (...) {
std::cerr << "Invalid -s format." << std::endl;
}
}
continue;
}
// 处理 -d 或 -D 参数
if ((arg == "-d" || arg == "-D") && i + 1 < argc) {
subdir = argv[++i];
continue;
}
if ((arg.rfind("-d", 0) == 0 || arg.rfind("-D", 0) == 0) && arg.length() > 2) {
subdir = arg.substr(2);
continue;
}
// 这里可以继续添加其它参数解析,例如 -x, -y
// if (arg == "-x" ... ) {...}
}
// 输出结果
std::cout << "g_front_seg_index: " << g_front_seg_index << "\n";
std::cout << "g_front_seg_num : " << g_front_seg_num << "\n";
std::cout << "subdir : " << subdir << "\n";
return true;
}
//获取前置类型
void init_global_function_enable() {
if (subdir == "cfg_stat_data") { // 历史稳态
g_node_id = STAT_DATA_BASE_NODE_ID;
auto_register_report_enabled = 1;
} else if (subdir == "cfg_3s_data") { // 实时
g_node_id = THREE_SECS_DATA_BASE_NODE_ID;
three_secs_enabled = 1;
} else if (subdir == "cfg_soe_comtrade") { // 告警、录波、暂态
g_node_id = SOE_COMTRADE_BASE_NODE_ID;
} else if (subdir == "cfg_recallhis_data") { // 补招
g_node_id = RECALL_HIS_DATA_BASE_NODE_ID;
}
}
//获取功能名称
std::string get_front_msg_from_subdir() {
if (subdir.find("cfg_3s_data") != std::string::npos)
return "实时数据进程";
else if (subdir.find("cfg_soe_comtrade") != std::string::npos)
return "暂态和告警进程";
else if (subdir.find("cfg_recallhis_data") != std::string::npos)
return "稳态补招进程";
else if (subdir.find("cfg_stat_data") != std::string::npos)
return "稳态统计进程";
else
return "unknown";
}
//获取前置路径
std::string get_parent_directory() {
// 获取当前工作目录
char cwd[PATH_MAX];
if (!getcwd(cwd, sizeof(cwd))) {
// 获取失败
return "";
}
// dirname 可能会修改传入的字符串,需要副本
std::string current_dir(cwd);
std::unique_ptr<char[]> temp(new char[current_dir.size() + 1]);
std::strcpy(temp.get(), current_dir.c_str());
// 获取父目录
char* parent = dirname(temp.get());
if (!parent) return "";
// 返回绝对路径
return std::string(parent);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////主要类结构
//------------------- Front 类(C++) -------------------
//构造函数
Front::Front():
m_worker(this),
m_threadPool(std::thread::hardware_concurrency()) // 用系统核数初始化线程池
{
//初始化g_node_id
init_global_function_enable();
//配置初始化
init_config();
//启动进程日志
init_logger_process();
DIY_WARNLOG("process","【WARN】前置的%s%d号进程 进程级日志初始化完毕", get_front_msg_from_subdir(), g_front_seg_index);
//读取台账
parse_device_cfg_web();
//初始化日志
init_loggers();
//读取模型,下载文件
parse_model_cfg_web();
//解析文件
Set_xml_nodeinfo();
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartMQProducerThread(); //开启生产者线程
StartTimerThread(); //开启定时线程
//启动worker 根据启动标志启动
if(G_TEST_FLAG){
if(!m_worker.startServer(TEST_PORT)) {
std::cerr << "[testshell] startServer failed.\n";
}
}
//初始化标志
std::this_thread::sleep_for(std::chrono::seconds(3));
INITFLAG = 1;
}
Front::~Front() {
FormClosing();
}
// ============ 关闭所有运行中的线程============
void Front::FormClosing() {
//确保testshell关闭
m_worker.stopServer();
// **确保前置线程被关闭**
if(m_FrontThread.joinable()) {
m_bIsFrontThreadCancle = true;
m_FrontThread.join(); // **等待前置线程结束**
}
// 定时线程
if (m_TimerThread.joinable()) {
m_IsTimerCancel = true;
m_TimerThread.join();
}
// 生产者线程
if (m_MQProducerThread.joinable()) {
m_IsMQProducerCancel = true;
m_MQProducerThread.join();
}
// 消费者线程
m_IsMQConsumerCancel = true;
if (m_mqConsumer) {
try {
m_mqConsumer->shutdown();
} catch (...) {
std::cerr << "mq consumer shutdown error" << std::endl;
}
m_mqConsumer.reset();
}
m_listener.reset();
if (m_MQConsumerThread.joinable()) {
m_MQConsumerThread.join();
}
}
//============ 线程函数 ============
void Front::StartFrontThread() {
m_bIsFrontThreadCancle = false;
m_FrontThread = std::thread(&Front::FrontThread, this);
}
void Front::StartMQConsumerThread() {
m_IsMQConsumerCancel = false;
m_MQConsumerThread = std::thread(&Front::mqconsumerThread, this);
}
void Front::StartMQProducerThread() {
m_IsMQProducerCancel = false;
m_MQProducerThread = std::thread(&Front::mqproducerThread, this);
}
void Front::StartTimerThread() {
m_IsTimerCancel = false;
m_TimerThread = std::thread(&Front::OnTimerThread, this);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////主功能线程
void Front::FrontThread() {
std::cout << "FrontThread::run() is called ...... \n";
try {
while (!m_bIsFrontThreadCancle) {
check_3s_config(); // 实时数据触发
create_recall_xml(); // 生成待补招xml文件
check_ledger_update(); // 触发台账更新
}
} catch (const std::exception& e) {
std::cerr << "[FrontThread] Caught exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "[FrontThread] Caught unknown exception" << std::endl;
}
// 设置重启标志
{
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartFrontThread = true;
}
std::cout << "[FrontThread] exited, will be restarted by monitor\n";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////定时任务
void Front::OnTimerThread()
{
try {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "OnTimerThread::run() is called ...... \n";
int counter = 0;
send_heartbeat_to_queue("1");
while (!m_IsTimerCancel)
{
update_log_entries_countdown();
if (counter >= 30) {
send_heartbeat_to_queue("1");
counter = 0;
}
counter++;
g_ontime_blocked_times = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
} catch (const std::exception& e) {
std::cerr << "[OnTimerThread] Caught exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "[OnTimerThread] Caught unknown exception" << std::endl;
}
// 设置重启标志
{
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartTimerThread = true;
}
std::cout << "[OnTimerThread] exited, will be restarted by monitor\n";
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////消费者线程
void Front::mqconsumerThread()
{
try {
std::string consumerGroup = subdir + std::to_string(g_front_seg_index);
std::string nameServer = G_MQCONSUMER_IPPORT;
std::vector<rocketmq::Subscription> subscriptions;
if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata);
}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate);
if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall);
}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset);
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog);
m_mqConsumer = make_unique<rocketmq::DefaultMQPushConsumer>(consumerGroup);
m_mqConsumer->setNamesrvAddr(nameServer);
m_mqConsumer->setSessionCredentials(G_MQCONSUMER_ACCESSKEY, G_MQCONSUMER_SECRETKEY, G_MQCONSUMER_CHANNEL);
m_mqConsumer->setInstanceName("inst_" + std::to_string(sGetMsTime()));
m_mqConsumer->setConsumeFromWhere(rocketmq::CONSUME_FROM_LAST_OFFSET);
std::map<std::string, rocketmq::Subscription::CallbackT> callbackMap;
for (const auto& sub : subscriptions) {
std::string key = sub.topic + ":" + sub.tag;
callbackMap.emplace(key, sub.callback);
m_mqConsumer->subscribe(sub.topic, sub.tag);
std::cout << "[mqconsumerThread] 已订阅 Topic=\"" << sub.topic << "\", Tag=\"" << sub.tag << "\"" << std::endl;
}
m_listener = std::make_shared<rocketmq::SubscriberListener>(callbackMap);
m_mqConsumer->registerMessageListener(m_listener.get());
m_mqConsumer->start();
std::cout << "[mqconsumerThread] Consumer 已启动,等待消息..." << std::endl;
// ✳️ 保持线程不主动退出,由 RocketMQ 内部驱动执行回调
// 如果 RocketMQ 内部机制失败或意外退出线程,就走 catch
}
catch (const rocketmq::MQClientException& e) {
std::cerr << "[mqconsumerThread] MQClientException: " << e.what() << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartConsumerThread = true;
return;
} catch (const std::exception& e) {
std::cerr << "[mqconsumerThread] std::exception: " << e.what() << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartConsumerThread = true;
return;
} catch (...) {
std::cerr << "[mqconsumerThread] Unknown exception" << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartConsumerThread = true;
return;
}
// 程序运行中,消费者会通过回调处理消息,线程保持存活即可
std::cout << "[mqconsumerThread] Consumer 线程正在运行,等待消息到达..." << std::endl;
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////生产者线程
void Front::mqproducerThread()
{
try {
// 1. 初始化生产者
InitializeProducer(m_producer);
std::cout << "\n[mqproducerThread] is running ...... \n\n";
uint32_t count = 0;
while (!m_IsMQProducerCancel) {
queue_data_t data;
bool data_gotten = false;
if(INITFLAG)
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
if (!queue_data_list.empty()) {
data = queue_data_list.front();
queue_data_list.pop_front();
data_gotten = true;
}
}
if (data_gotten) {
auto now = std::chrono::system_clock::now();
auto ms_part = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()) % 1000;
auto time_t_part = std::chrono::system_clock::to_time_t(now);
std::tm tm_buf;
localtime_r(&time_t_part, &tm_buf);
char timeStr[32];
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf);
std::cout << "BEGIN my_queue_send no." << count
<< " >>>> " << timeStr
<< "." << std::setw(3) << std::setfill('0') << ms_part.count()
<< std::endl;
// 调用实际发送
my_rocketmq_send(data, m_producer);
now = std::chrono::system_clock::now();
ms_part = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()) % 1000;
time_t_part = std::chrono::system_clock::to_time_t(now);
localtime_r(&time_t_part, &tm_buf);
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf);
std::cout << "END my_queue_send no." << count++
<< " >>>> " << timeStr
<< "." << std::setw(3) << std::setfill('0') << ms_part.count()
<< "\n\n";
}
g_mqproducer_blocked_times = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "[mqproducerThread] 正常退出\n";
}
catch (const std::exception& e) {
std::cerr << "[mqproducerThread] std::exception: " << e.what() << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartProducerThread = true;
}
catch (...) {
std::cerr << "[mqproducerThread] unknown exception\n";
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartProducerThread = true;
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////用例,除通讯外其他功能都可实现
//int main(int argc char** argv) //变为线程
extern thread_info_t thread_info[THREAD_CONNECTIONS];
void* cloudfrontthread(void* arg) {
///////////////////////////////////////
ThreadArgs* args = static_cast<ThreadArgs*>(arg);
int argc = args->argc;
char **argv = args->argv;
printf("argc = %d, argv[0] = %s\n", argc, argv[0]);
//添加线程处理
int index = *(int*)argv[0];
// 更新线程状态为运行中
pthread_mutex_lock(&thread_info[index].lock);
printf("cloudfrontthread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
///////////////////////////////////////
// 解析命令行参数
if(!parse_param(argc,argv)){
std::cerr << "process param error,exit" << std::endl;
return nullptr;
}
// 线程使用完后清理参数
delete args;
//路径获取
FRONT_PATH = get_parent_directory();
std::cout << "FRONT_PATH:" << FRONT_PATH << std::endl;
//声明前置
std::unique_ptr<Front> FrontProcess;
FrontProcess = make_unique<Front>();
std::cout << "[Main] Program running in background.\n";
// 5) 主线程保持后台运行
while(running) {
{
std::lock_guard<std::mutex> lock(FrontProcess->m_threadCheckMutex);
if (FrontProcess->m_needRestartFrontThread) {
std::cout << "[Monitor] Restarting FrontThread..." << std::endl;
FrontProcess->StartFrontThread();
FrontProcess->m_needRestartFrontThread = false;
}
if (FrontProcess->m_needRestartConsumerThread) {
std::cout << "[Monitor] Restarting MQConsumerThread..." << std::endl;
FrontProcess->StartMQConsumerThread();
FrontProcess->m_needRestartConsumerThread = false;
}
if (FrontProcess->m_needRestartProducerThread) {
std::cout << "[Monitor] Restarting MQProducerThread..." << std::endl;
FrontProcess->StartMQProducerThread();
FrontProcess->m_needRestartProducerThread = false;
}
if (FrontProcess->m_needRestartTimerThread) {
std::cout << "[Monitor] Restarting TimerThread..." << std::endl;
FrontProcess->StartTimerThread();
FrontProcess->m_needRestartTimerThread = false;
}
}
std::this_thread::sleep_for(std::chrono::seconds(60));//每分钟检测一次
}
return nullptr;
}