Files
front_linux/LFtid1056/cloudfront/code/main.cpp
2025-09-12 17:08:25 +08:00

630 lines
22 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/////////////////////////////////////////////////////////////////////////////////////////////////////
#include <memory>
#include <queue> //任务队列
#include <csignal> //信号处理
#include <iostream> //标准输入输出
#include <string> //字符串
#include <vector> //数组型容器
#include <map> //映射
#include <mutex> //锁
#include <thread> //线程
#include <condition_variable> //线程控制
#include <atomic> //原子操作
#include <chrono> //时间
#include <ctime> //时间
#include <sstream> //流处理
#include <cstdio> //字符处理
#include <iomanip> //格式控制
#include <functional> //类消费者绑定
#include <unistd.h> //获取目录
#include <array>
#include <list>
#include <cstdio>
#include <sys/stat.h>
#include <fnmatch.h>
#include <libgen.h>
#include <cstdlib>
////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "interface.h" //用于访问接口
#include "log4cplus/log4.h" //用于日志
#include "curl/curl.h" //用于访问接口
#include "nlohmann/json.hpp" //用于构造json
#include "worker.h" //shell接口
#include "rocketmq.h"
#include "rocketmq/MQClientException.h"
#include "front.h"
//////////////////////////////////////////////////////////////////////////////////////////////////////
using json = nlohmann::json;
//////////////////////////////////////////////////////////////////////////////////////////////////////全局变量
//前置程序路径
std::string FRONT_PATH;
//初始化标志
int INITFLAG = 0;
//前置标置
std::string subdir = "cloudfrontproc"; //子目录
uint32_t g_node_id = 0;
int g_front_seg_index = 0; //默认单进程
int g_front_seg_num = 0; //默认单进程
//实时进程标志
int three_secs_enabled = 0;
//稳态进程自动注册报告标志
int auto_register_report_enabled = 0;
//mq生产线程和定时线程都加上死锁计数器
uint32_t g_mqproducer_blocked_times = 0;
uint32_t g_ontime_blocked_times = 0;
//进程控制
std::atomic<bool> running{true};
void onSignal(int){ running = false; }
/////////////////////////////////////////////////////////////////////////////////////////////////////
extern int G_TEST_FLAG; //测试线程开启开关
extern int TEST_PORT; //测试端口号
extern std::string FRONT_INST;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
template<typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
//处理参数
bool parse_param(int argc, char* argv[]) {
for (int i = 1; i < argc; ++i) {
std::string arg = argv[i];
// 处理 -s 参数
if (arg == "-s" && i + 1 < argc) {
std::string val = argv[++i];
auto pos = val.find('_');
if (pos != std::string::npos) {
try {
g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1));
} catch (...) {
std::cerr << "Invalid -s format." << std::endl;
}
}
continue;
}
// 处理 -s1_5 这种紧凑写法
if (arg.rfind("-s", 0) == 0 && arg.length() > 2) {
std::string val = arg.substr(2);
auto pos = val.find('_');
if (pos != std::string::npos) {
try {
g_front_seg_index = std::stoi(val.substr(0, pos));
g_front_seg_num = std::stoi(val.substr(pos + 1));
} catch (...) {
std::cerr << "Invalid -s format." << std::endl;
}
}
continue;
}
// 处理 -d 或 -D 参数
if ((arg == "-d" || arg == "-D") && i + 1 < argc) {
subdir = argv[++i];
continue;
}
if ((arg.rfind("-d", 0) == 0 || arg.rfind("-D", 0) == 0) && arg.length() > 2) {
subdir = arg.substr(2);
continue;
}
// 这里可以继续添加其它参数解析,例如 -x, -y
// if (arg == "-x" ... ) {...}
}
// 输出结果
std::cout << "g_front_seg_index: " << g_front_seg_index << "\n";
std::cout << "g_front_seg_num : " << g_front_seg_num << "\n";
std::cout << "subdir : " << subdir << "\n";
return true;
}
//获取前置类型
/*void init_global_function_enable() {
if (subdir == "cfg_stat_data") { // 历史稳态
g_node_id = STAT_DATA_BASE_NODE_ID;
auto_register_report_enabled = 1;
} else if (subdir == "cfg_3s_data") { // 实时
g_node_id = THREE_SECS_DATA_BASE_NODE_ID;
three_secs_enabled = 1;
} else if (subdir == "cfg_soe_comtrade") { // 告警、录波、暂态
g_node_id = SOE_COMTRADE_BASE_NODE_ID;
} else if (subdir == "cfg_recallhis_data") { // 补招
g_node_id = RECALL_HIS_DATA_BASE_NODE_ID;
}
}*/
//获取功能名称
/*std::string get_front_msg_from_subdir() {
if (subdir.find("cfg_3s_data") != std::string::npos)
return "实时数据进程";
else if (subdir.find("cfg_soe_comtrade") != std::string::npos)
return "暂态和告警进程";
else if (subdir.find("cfg_recallhis_data") != std::string::npos)
return "稳态补招进程";
else if (subdir.find("cfg_stat_data") != std::string::npos)
return "稳态统计进程";
else
return "unknown";
}*/
//获取前置路径
std::string get_parent_directory() {
// 获取当前工作目录
char cwd[PATH_MAX];
if (!getcwd(cwd, sizeof(cwd))) {
// 获取失败
return "";
}
// dirname 可能会修改传入的字符串,需要副本
std::string current_dir(cwd);
std::unique_ptr<char[]> temp(new char[current_dir.size() + 1]);
std::strcpy(temp.get(), current_dir.c_str());
// 获取父目录
char* parent = dirname(temp.get());
if (!parent) return "";
// 返回绝对路径
return std::string(parent);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////主要类结构
//------------------- Front 类(C++) -------------------
//构造函数
Front::Front():
m_worker(this),
m_threadPool(std::thread::hardware_concurrency()) // 用系统核数初始化线程池
{
//初始化g_node_id
//init_global_function_enable();
//配置初始化
init_config();
//启动进程日志
init_logger_process();
DIY_WARNLOG("process","【WARN】前置的%d号进程 进程级日志初始化完毕", g_front_seg_index);
//读取台账
parse_device_cfg_web();
//初始化日志
init_loggers();
//读取模型,下载模板文件
//parse_model_cfg_web();
//解析模板文件
//Set_xml_nodeinfo();
StartFrontThread(); //开启主线程
StartMQConsumerThread(); //开启消费者线程
StartMQProducerThread(); //开启生产者线程
StartTimerThread(); //开启定时线程
//启动worker 根据启动标志启动
if(G_TEST_FLAG){
if(!m_worker.startServer(TEST_PORT)) {
std::cerr << "[testshell] startServer failed.\n";
}
}
//初始化标志
std::this_thread::sleep_for(std::chrono::seconds(3));
INITFLAG = 1;
}
Front::~Front() {
FormClosing();
}
// ============ 关闭所有运行中的线程============
void Front::FormClosing() {
//确保testshell关闭
m_worker.stopServer();
// **确保前置线程被关闭**
if(m_FrontThread.joinable()) {
m_bIsFrontThreadCancle = true;
m_FrontThread.join(); // **等待前置线程结束**
}
// 定时线程
if (m_TimerThread.joinable()) {
m_IsTimerCancel = true;
m_TimerThread.join();
}
// 生产者线程
if (m_MQProducerThread.joinable()) {
m_IsMQProducerCancel = true;
m_MQProducerThread.join();
}
// 消费者线程
m_IsMQConsumerCancel = true;
if (m_mqConsumer) {
try {
m_mqConsumer->shutdown();
} catch (...) {
std::cerr << "mq consumer shutdown error" << std::endl;
}
m_mqConsumer.reset();
}
m_listener.reset();
if (m_MQConsumerThread.joinable()) {
m_MQConsumerThread.join();
}
}
//============ 线程函数 ============
void Front::StartFrontThread() {
m_bIsFrontThreadCancle = false;
m_FrontThread = std::thread(&Front::FrontThread, this);
}
void Front::StartMQConsumerThread() {
m_IsMQConsumerCancel = false;
m_MQConsumerThread = std::thread(&Front::mqconsumerThread, this);
}
void Front::StartMQProducerThread() {
m_IsMQProducerCancel = false;
m_MQProducerThread = std::thread(&Front::mqproducerThread, this);
}
void Front::StartTimerThread() {
m_IsTimerCancel = false;
m_TimerThread = std::thread(&Front::OnTimerThread, this);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////主功能线程
void Front::FrontThread() {
std::cout << "FrontThread::run() is called ...... \n";
try {
while (!m_bIsFrontThreadCancle) {
check_recall_file(); //处理补招文件-稳态和暂态
check_recall_event(); // 处理补招事件从list中读取然后直接调用接口,每一条可能都不同测点,每个测点自己做好记录
check_ledger_update(); // 触发台账更新
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
} catch (const std::exception& e) {
std::cerr << "[FrontThread] Caught exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "[FrontThread] Caught unknown exception" << std::endl;
}
// 设置重启标志
{
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartFrontThread = true;
}
std::cout << "[FrontThread] exited, will be restarted by monitor\n";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////定时任务
void Front::OnTimerThread()
{
try {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "OnTimerThread::run() is called ...... \n";
int hbCounter = 0; // 心跳计数
int backupCounter = 0; // 备份计数(分钟用)
send_heartbeat_to_queue("1");
while (!m_IsTimerCancel)
{
update_log_entries_countdown();
//业务超时检查
check_device_busy_timeout();
// 每 30 秒发一次心跳
if (hbCounter >= 30) {
send_heartbeat_to_queue("1");
hbCounter = 0;
}
// 每 60 秒调用一次录波文件检查
if (backupCounter >= 60) {
check_and_backup_qvvr_files();
backupCounter = 0;
}
hbCounter++;
backupCounter++;
g_ontime_blocked_times = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
} catch (const std::exception& e) {
std::cerr << "[OnTimerThread] Caught exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "[OnTimerThread] Caught unknown exception" << std::endl;
}
{
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartTimerThread = true;
}
std::cout << "[OnTimerThread] exited, will be restarted by monitor\n";
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////消费者线程
void Front::mqconsumerThread()
{
try {
std::string consumerGroup = subdir + std::to_string(g_front_seg_index);
std::string nameServer = G_MQCONSUMER_IPPORT;
std::vector<rocketmq::Subscription> subscriptions;
//if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata);
//}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate);
//if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall);
//}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset);
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog);
m_mqConsumer = make_unique<rocketmq::DefaultMQPushConsumer>(consumerGroup);
m_mqConsumer->setNamesrvAddr(nameServer);
m_mqConsumer->setSessionCredentials(G_MQCONSUMER_ACCESSKEY, G_MQCONSUMER_SECRETKEY, G_MQCONSUMER_CHANNEL);
m_mqConsumer->setInstanceName("inst_" + std::to_string(sGetMsTime()));
m_mqConsumer->setConsumeFromWhere(rocketmq::CONSUME_FROM_LAST_OFFSET);
std::map<std::string, rocketmq::Subscription::CallbackT> callbackMap;
for (const auto& sub : subscriptions) {
std::string key = sub.topic + ":" + sub.tag;
callbackMap.emplace(key, sub.callback);
m_mqConsumer->subscribe(sub.topic, sub.tag);
std::cout << "[mqconsumerThread] 已订阅 Topic=\"" << sub.topic << "\", Tag=\"" << sub.tag << "\"" << std::endl;
}
m_listener = std::make_shared<rocketmq::SubscriberListener>(callbackMap);
m_mqConsumer->registerMessageListener(m_listener.get());
m_mqConsumer->start();
std::cout << "[mqconsumerThread] Consumer 已启动,等待消息..." << std::endl;
// ✳️ 保持线程不主动退出,由 RocketMQ 内部驱动执行回调
// 如果 RocketMQ 内部机制失败或意外退出线程,就走 catch
}
catch (const rocketmq::MQClientException& e) {
std::cerr << "[mqconsumerThread] MQClientException: " << e.what() << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartConsumerThread = true;
return;
} catch (const std::exception& e) {
std::cerr << "[mqconsumerThread] std::exception: " << e.what() << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartConsumerThread = true;
return;
} catch (...) {
std::cerr << "[mqconsumerThread] Unknown exception" << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartConsumerThread = true;
return;
}
// 程序运行中,消费者会通过回调处理消息,线程保持存活即可
std::cout << "[mqconsumerThread] Consumer 线程正在运行,等待消息到达..." << std::endl;
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////生产者线程
void Front::mqproducerThread()
{
try {
// 1. 初始化生产者
InitializeProducer(m_producer);
std::cout << "\n[mqproducerThread] is running ...... \n\n";
uint32_t count = 0;
while (!m_IsMQProducerCancel) {
queue_data_t data;
bool data_gotten = false;
if(INITFLAG)
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
if (!queue_data_list.empty()) {
data = queue_data_list.front();
queue_data_list.pop_front();
data_gotten = true;
}
}
if (data_gotten) {
auto now = std::chrono::system_clock::now();
auto ms_part = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()) % 1000;
auto time_t_part = std::chrono::system_clock::to_time_t(now);
std::tm tm_buf;
localtime_r(&time_t_part, &tm_buf);
char timeStr[32];
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf);
std::cout << "BEGIN my_queue_send no." << count
<< " >>>> " << timeStr
<< "." << std::setw(3) << std::setfill('0') << ms_part.count()
<< std::endl;
// 调用实际发送
my_rocketmq_send(data, m_producer);
now = std::chrono::system_clock::now();
ms_part = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()) % 1000;
time_t_part = std::chrono::system_clock::to_time_t(now);
localtime_r(&time_t_part, &tm_buf);
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", &tm_buf);
std::cout << "END my_queue_send no." << count++
<< " >>>> " << timeStr
<< "." << std::setw(3) << std::setfill('0') << ms_part.count()
<< "\n\n";
}
g_mqproducer_blocked_times = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "[mqproducerThread] 正常退出\n";
}
catch (const std::exception& e) {
std::cerr << "[mqproducerThread] std::exception: " << e.what() << std::endl;
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartProducerThread = true;
}
catch (...) {
std::cerr << "[mqproducerThread] unknown exception\n";
std::lock_guard<std::mutex> lock(m_threadCheckMutex);
m_needRestartProducerThread = true;
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////用例,除通讯外其他功能都可实现
//int main(int argc char** argv) //变为线程
extern thread_info_t thread_info[THREAD_CONNECTIONS];
void cleanup_args(ThreadArgs* args) {
for (int i = 0; i < args->argc; ++i) {
free(args->argv[i]); // strdup 分配的
}
delete[] args->argv;
delete args;
}
void* cloudfrontthread(void* arg) {
///////////////////////////////////////
ThreadArgs* args = static_cast<ThreadArgs*>(arg);
int argc = args->argc;
char **argv = args->argv;
printf("[cloudfrontthread] argc = %d\n", argc);
for (int i = 0; i < argc; ++i) {
printf(" argv[%d] = %s\n", i, argv[i]);
}
// 动态解析线程 index
int index = 0;
if (argc > 0 && argv[0]) {
try {
index = std::stoi(argv[0]);
} catch (...) {
std::cerr << "[cloudfrontthread] Failed to parse index from argv[0]: " << argv[0] << "\n";
return nullptr;
}
}
// 更新线程状态为运行中
pthread_mutex_lock(&thread_info[index].lock);
printf("cloudfrontthread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
///////////////////////////////////////
// 解析命令行参数
if(!parse_param(argc,argv)){
std::cerr << "process param error,exit" << std::endl;
cleanup_args(args);
return nullptr;
}
// 线程使用完后清理参数
cleanup_args(args);
//路径获取
FRONT_PATH = get_parent_directory();
std::cout << "FRONT_PATH:" << FRONT_PATH << std::endl;
//声明前置
std::unique_ptr<Front> FrontProcess;
FrontProcess = make_unique<Front>();
std::cout << "[Main] Program running in background.\n";
// 5) 主线程保持后台运行
while(running) {
{
std::lock_guard<std::mutex> lock(FrontProcess->m_threadCheckMutex);
if (FrontProcess->m_needRestartFrontThread) {
std::cout << "[Monitor] Restarting FrontThread..." << std::endl;
FrontProcess->StartFrontThread();
FrontProcess->m_needRestartFrontThread = false;
}
if (FrontProcess->m_needRestartConsumerThread) {
std::cout << "[Monitor] Restarting MQConsumerThread..." << std::endl;
FrontProcess->StartMQConsumerThread();
FrontProcess->m_needRestartConsumerThread = false;
}
if (FrontProcess->m_needRestartProducerThread) {
std::cout << "[Monitor] Restarting MQProducerThread..." << std::endl;
FrontProcess->StartMQProducerThread();
FrontProcess->m_needRestartProducerThread = false;
}
if (FrontProcess->m_needRestartTimerThread) {
std::cout << "[Monitor] Restarting TimerThread..." << std::endl;
FrontProcess->StartTimerThread();
FrontProcess->m_needRestartTimerThread = false;
}
}
std::this_thread::sleep_for(std::chrono::seconds(60));//每分钟检测一次
}
return nullptr;
}