#include #include #include #include #include #include #include "client2.h" #include "cloudfront/code/interface.h" #include #include #include using namespace std; #if 0 /* 常量定义 */ #define THREAD_CONNECTIONS 10 // 最大线程数 #define MONITOR_INTERVAL 1 // 监控间隔(秒) /* 线程状态枚举 */ typedef enum { THREAD_RUNNING, // 0:运行中 THREAD_STOPPED, // 1:正常停止 THREAD_RESTARTING, // 2:重启中 THREAD_CRASHED // 3:异常崩溃 } thread_state_t; /* 线程控制结构体 */ typedef struct { pthread_t tid; // 线程ID int index; // 线程编号(0~CONNECTIONS-1) thread_state_t state; // 当前状态 pthread_mutex_t lock; // 线程专用互斥锁 } thread_info_t; #endif extern int INITFLAG;//台账等初始化完成标志 extern void cleanup_args(ThreadArgs* args); void init_daemon(void) { int pid; int i; if( pid = fork() ) exit(0); /** 是父进程,结束父进程 */ else if( pid < 0 ) exit(1); /** fork失败,退出 */ /** 是第一子进程,后台继续执行 */ setsid(); /** 第一子进程成为新的会话组长和进程组长并与控制终端分离 */ if( pid = fork() ) exit(0); /** 是第一子进程,结束第一子进程 */ else if( pid < 0) exit(1); /** fork失败,退出 */ chdir("/FeProject/bin/"); //multi process running at same time umask(0); /** 重设文件创建掩码 */ return; } /* 全局变量 */ thread_info_t thread_info[THREAD_CONNECTIONS]; // 线程信息数组 pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // 全局互斥锁 extern SafeMessageQueue message_queue; // 生成测试装置 std::vector generate_test_devices(int count) { std::vector devices; for (int i = 1; i <= count; ++i) { // 生成装置ID和名称 std::string dev_id = "D" + std::to_string(1000 + i).substr(1); // D001, D002, ..., D100 std::string dev_name = "Device " + std::to_string(i); // 生成测点 std::vector points = { { "P" + dev_id.substr(1) + "01", // 测点ID如 P00101 "Voltage " + dev_name, dev_id, 1, 0.0, // 随机电压值 0.0, 100.0, 80.0 }, { "P" + dev_id.substr(1) + "02", // 测点ID如 P00102 "Current " + dev_name, dev_id, 2, 0.0, // 随机电流值 0.0, 20.0, 15.0 } }; // 添加装置 devices.push_back({ dev_id, dev_name, (i % 2 == 0) ? "Model-X" : "Model-Y", // 交替使用两种型号 "00-B7-8D-A8-00-D6", // 随机MAC地址 1, // 状态 (1=在线) points }); } return devices; } void PrintDevices(const std::vector& devices) { std::cout << "==== Devices List (" << devices.size() << ") ====\n"; for (const auto& dev : devices) { std::cout << "Device ID : " << dev.device_id << "\n"; std::cout << "Name : " << dev.name << "\n"; std::cout << "Model : " << dev.model << "\n"; std::cout << "MAC : " << dev.mac << "\n"; std::cout << "Status : " << dev.status << "\n"; std::cout << "Points (" << dev.points.size() << "):\n"; for (const auto& pt : dev.points) { std::cout << " Point ID : " << pt.point_id << "\n"; std::cout << " Name : " << pt.name << "\n"; std::cout << " Device ID : " << pt.device_id << "\n"; std::cout << " Cpu No : " << pt.nCpuNo << "\n"; std::cout << " PT1 : " << pt.PT1 << "\n"; std::cout << " PT2 : " << pt.PT2 << "\n"; std::cout << " CT1 : " << pt.CT1 << "\n"; std::cout << " CT2 : " << pt.CT2 << "\n"; std::cout << " Scale : " << pt.strScale << "\n"; std::cout << " PTType : " << pt.nPTType << "\n"; std::cout << "----------------------\n"; } std::cout << "==========================\n"; } } /* 线程工作函数 0号子线程*/ /* 客户端连接管理线程函数*/ void* client_manager_thread(void* arg) { int index = *(int*)arg; free(arg); // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); printf("Client Manager Thread %d started\n", index); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); printf("Started client connections\n"); // 创建测点数据 std::vector points1 = { {"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1,"0.38k",0}, {"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1,"0.38k",0} }; //00B78DA800D6 00-B7-8D-01-79-06 // 创建装置列表 std::vector devices = { { "D001", "Primary Device", "Model-X", "00-B7-8D-01-79-06", 1, points1 } }; // 生成100个测试装置 std::vector test_devices = generate_test_devices(100); //std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk添加 // 启动客户端连接 start_client_connect(devices); printf("Stopped all client connections\n"); // 线程终止处理 pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; printf("Client Manager Thread %d stopped\n", index); pthread_mutex_unlock(&thread_info[index].lock); return NULL; } /* 线程工作函数 1号子线程*/ /* 消息处理线程函数 */ void* message_processor_thread(void* arg) { int index = *(int*)arg; free(arg); // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); printf("Message Processor Thread %d started\n", index); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); // 消息处理循环 while (1) { deal_message_t msg; if (message_queue.pop(msg)) { // 实际消息处理逻辑 // 注意:这里需要根据msg.client_index区分客户端 // 处理完成后释放内存 // 调用实际的消息处理函数 process_received_message(msg.mac, msg.device_id, msg.data, msg.length); free(msg.data); } else { // 队列为空时短暂休眠(100微秒 = 0.1毫秒) usleep(100); } } // 线程终止处理 pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; printf("Message Processor Thread %d stopped\n", index); pthread_mutex_unlock(&thread_info[index].lock); return NULL; } /* 线程重启函数 */ void restart_thread(int index) { pthread_mutex_lock(&global_lock); if (thread_info[index].state == THREAD_RESTARTING) { pthread_mutex_unlock(&global_lock); return; // 避免重复重启 } thread_info[index].state = THREAD_RESTARTING; printf("Restarting thread %d\n", index); pthread_mutex_unlock(&global_lock); // 创建新线程 int* new_index = (int*)malloc(sizeof(int)); *new_index = index; if (index == 1) { // 客户端管理线程 if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart client manager thread %d\n", index); thread_info[index].state = THREAD_CRASHED; pthread_mutex_unlock(&global_lock); free(new_index); } } else if (index == 2) { // 消息处理线程 if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart message processor thread %d\n", index); thread_info[index].state = THREAD_CRASHED; pthread_mutex_unlock(&global_lock); free(new_index); } } else if (index == 0) { // 接口,mq char* argv[] = { (char*)new_index };//这里需要构造进程号参数传入 ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart message processor thread %d\n", index); thread_info[index].state = THREAD_CRASHED; pthread_mutex_unlock(&global_lock); delete args; // 如果线程没创建成功就手动释放 free(new_index); } } else { // 其他工作线程 // 这里简化为空,实际应用中可添加其他线程 } } /* 线程存活检测 */ int is_thread_alive(pthread_t tid) { return pthread_tryjoin_np(tid, NULL) == EBUSY; // EBUSY表示线程仍在运行 } //lnk参数 ThreadArgs* make_thread_args_from_strs(const std::vector& args_vec) { char** argv = new char*[args_vec.size() + 1]; // 多一个 nullptr 结尾 for (size_t i = 0; i < args_vec.size(); ++i) { argv[i] = strdup(args_vec[i].c_str()); // strdup 是 malloc 出来的 } argv[args_vec.size()] = nullptr; return new ThreadArgs{static_cast(args_vec.size()), argv}; } /* 主函数 */ int main(int argc ,char** argv) {//多进程添加参数 if(!parse_param(argc,argv)){ std::cerr << "process param error,exit" << std::endl; return 1; } //init_daemon(); srand(time(NULL)); // 初始化随机数种子 // 初始化线程数组 for (int i = 0; i < THREAD_CONNECTIONS; i++) { thread_info[i].index = i; thread_info[i].state = THREAD_STOPPED; pthread_mutex_init(&thread_info[i].lock, NULL); // 初始化每个线程的锁 } //接口和mq ThreadArgs* args = make_thread_args_from_strs({ "0" }); if (pthread_create(&thread_info[0].tid, NULL, cloudfrontthread, args) != 0) { printf("Failed to create message processor thread 0\n"); cleanup_args(args); } while(!INITFLAG){ std::this_thread::sleep_for(std::chrono::seconds(3)); std::cout << "waiting cloudfront initialize ..." << std::endl; } // 创建初始线程组 for (int i = 1; i < THREAD_CONNECTIONS; i++) { int* index = (int*)malloc(sizeof(int)); *index = i; if (i == 1) { // 客户端管理线程 if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) { printf("Failed to create client manager thread %d\n", i); free(index); } } else if (i == 2) { // 消息处理线程 if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) { printf("Failed to create message processor thread %d\n", i); free(index); } } else if (i == 3){ /*//接口和mq char* argv[] = { (char*)index };//这里需要构造进程号参数传入 ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { printf("Failed to create message processor thread %d\n", i); delete args; // 如果线程没创建成功就手动释放 free(index); }*/ } else { // 其他工作线程 // 这里简化为空,实际应用中可添加其他线程 free(index); } } printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS); // 主监控循环 while (1) { sleep(MONITOR_INTERVAL); // 检查所有线程状态 for (int i = 0; i < THREAD_CONNECTIONS; i++) { pthread_mutex_lock(&thread_info[i].lock); // 检测运行中线程是否崩溃 if (thread_info[i].state == THREAD_RUNNING && !is_thread_alive(thread_info[i].tid)) { printf("Thread %d crashed unexpectedly\n", i); thread_info[i].state = THREAD_CRASHED; } // 处理需要重启的线程 if (thread_info[i].state == THREAD_STOPPED || thread_info[i].state == THREAD_CRASHED) { pthread_mutex_unlock(&thread_info[i].lock); restart_thread(i); // 异步重启 } else { pthread_mutex_unlock(&thread_info[i].lock); } } // 创建测点数据 std::vector points2 = { {"P101", "Generator Output", "D002",1 ,1, 1, 1, 1,"0.38k",0} }; //00B78DA800D6 00-B7-8D-01-79-06 // 创建装置列表 std::vector devices = { { "D002", "Backup Device", "Model-Y", "00-B7-8D-A8-00-D6", 1, points2 } }; // 监控socket队列状态 static int queue_monitor = 0; //static int count = 3; if (++queue_monitor >= 20) { // 尝试添加一个设备 printf("Message queue size: %zu\n", message_queue.size()); //queue_monitor = 0; for (const auto& device : devices) { ClientManager::instance().add_device(device); } /*std::vector test_devices = generate_test_devices(count); count++; for (const auto& device : test_devices) { ClientManager::instance().remove_device("D001"); }*/ } } // 清理资源(理论上不会执行到这里) for (int i = 0; i < THREAD_CONNECTIONS; i++) { pthread_mutex_destroy(&thread_info[i].lock); } return 0; }