#include #include #include #include #include #include #include "PQSMsg.h" #include "client2.h" #include "dealMsg.h" #include "cloudfront/code/interface.h" using namespace std; #if 0 /* 常量定义 */ #define THREAD_CONNECTIONS 10 // 最大线程数 #define MONITOR_INTERVAL 1 // 监控间隔(秒) /* 线程状态枚举 */ typedef enum { THREAD_RUNNING, // 0:运行中 THREAD_STOPPED, // 1:正常停止 THREAD_RESTARTING, // 2:重启中 THREAD_CRASHED // 3:异常崩溃 } thread_state_t; /* 线程控制结构体 */ typedef struct { pthread_t tid; // 线程ID int index; // 线程编号(0~CONNECTIONS-1) thread_state_t state; // 当前状态 pthread_mutex_t lock; // 线程专用互斥锁 } thread_info_t; #endif /* 全局变量 */ thread_info_t thread_info[THREAD_CONNECTIONS]; // 线程信息数组 pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // 全局互斥锁 extern SafeMessageQueue message_queue; // 生成测试装置 std::vector generate_test_devices(int count) { std::vector devices; for (int i = 1; i <= count; ++i) { // 生成装置ID和名称 std::string dev_id = "D" + std::to_string(1000 + i).substr(1); // D001, D002, ..., D100 std::string dev_name = "Device " + std::to_string(i); // 生成测点 std::vector points = { { "P" + dev_id.substr(1) + "01", // 测点ID如 P00101 "Voltage " + dev_name, dev_id, 0.0, // 随机电压值 0.0, 100.0, 80.0 }, { "P" + dev_id.substr(1) + "02", // 测点ID如 P00102 "Current " + dev_name, dev_id, 0.0, // 随机电流值 0.0, 20.0, 15.0 } }; // 添加装置 devices.push_back({ dev_id, dev_name, (i % 2 == 0) ? "Model-X" : "Model-Y", // 交替使用两种型号 "00-B7-8D-A8-00-D6", // 随机MAC地址 1, // 状态 (1=在线) points }); } return devices; } /* 线程工作函数 待分配线程池*/ void* work_thread(void* arg) { int index = *(int*)arg; // 获取线程索引 free(arg); // 释放动态分配的索引内存 // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); printf("Thread %d started\n", index); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); // 模拟工作循环(5秒间隔) while (1) { sleep(5); // 10%概率模拟线程故障 if (rand() % 10 == 0) { pthread_mutex_lock(&thread_info[index].lock); printf("Thread %d simulated failure\n", index); pthread_mutex_unlock(&thread_info[index].lock); break; } } // 线程终止处理 pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; printf("Thread %d stopped\n", index); pthread_mutex_unlock(&thread_info[index].lock); return NULL; } /* 线程工作函数 0号子线程*/ /* 客户端连接管理线程函数*/ void* client_manager_thread(void* arg) { int index = *(int*)arg; free(arg); // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); printf("Client Manager Thread %d started\n", index); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); printf("Started client connections\n"); // 创建测点数据 std::vector points1 = { {"P001", "Main Voltage", "D001", 10.0, 0.0, 100.0, 0.0}, {"P002", "Backup Voltage", "D001", 5.0, 0.0, 50.0, 0.0} }; std::vector points2 = { {"P101", "Generator Output", "D002", 20.0, 0.0, 200.0, 0.0} }; // 创建装置列表 std::vector devices = { { "D001", "Primary Device", "Model-X", "00-B7-8D-A8-00-D9", 1, points1 }, { "D002", "Backup Device", "Model-Y", "00-B7-8D-A8-00-D6", 1, points2 } }; // 生成100个测试装置 std::vector test_devices = generate_test_devices(100); //std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk添加 // 启动客户端连接 start_client_connect(devices); printf("Stopped all client connections\n"); // 线程终止处理 pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; printf("Client Manager Thread %d stopped\n", index); pthread_mutex_unlock(&thread_info[index].lock); return NULL; } /* 线程工作函数 1号子线程*/ /* 消息处理线程函数 */ void* message_processor_thread(void* arg) { int index = *(int*)arg; free(arg); // 更新线程状态为运行中 pthread_mutex_lock(&thread_info[index].lock); printf("Message Processor Thread %d started\n", index); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); // 消息处理循环 while (1) { deal_message_t msg; if (message_queue.pop(msg)) { // 实际消息处理逻辑 // 注意:这里需要根据msg.client_index区分客户端 // 处理完成后释放内存 // 调用实际的消息处理函数 process_received_message(msg.mac, msg.device_id, msg.data, msg.length); free(msg.data); } else { // 队列为空时短暂休眠(100微秒 = 0.1毫秒) usleep(100); } } // 线程终止处理 pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; printf("Message Processor Thread %d stopped\n", index); pthread_mutex_unlock(&thread_info[index].lock); return NULL; } /* 线程重启函数 */ void restart_thread(int index) { pthread_mutex_lock(&global_lock); if (thread_info[index].state == THREAD_RESTARTING) { pthread_mutex_unlock(&global_lock); return; // 避免重复重启 } thread_info[index].state = THREAD_RESTARTING; printf("Restarting thread %d\n", index); pthread_mutex_unlock(&global_lock); // 创建新线程 int* new_index = (int*)malloc(sizeof(int)); *new_index = index; if (index == 0) { // 客户端管理线程 if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart client manager thread %d\n", index); thread_info[index].state = THREAD_CRASHED; pthread_mutex_unlock(&global_lock); free(new_index); } } else if (index == 1) { // 消息处理线程 if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart message processor thread %d\n", index); thread_info[index].state = THREAD_CRASHED; pthread_mutex_unlock(&global_lock); free(new_index); } } else if (index == 2) { // 接口,mq char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; ThreadArgs* args = new ThreadArgs{3, argv}; if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart message processor thread %d\n", index); thread_info[index].state = THREAD_CRASHED; pthread_mutex_unlock(&global_lock); delete args; // 如果线程没创建成功就手动释放 free(new_index); } } else { // 其他工作线程 // 这里简化为空,实际应用中可添加其他线程 } } /* 线程存活检测 */ int is_thread_alive(pthread_t tid) { return pthread_tryjoin_np(tid, NULL) == EBUSY; // EBUSY表示线程仍在运行 } /* 主函数 */ int main() { srand(time(NULL)); // 初始化随机数种子 // 初始化线程数组 for (int i = 0; i < THREAD_CONNECTIONS; i++) { thread_info[i].index = i; thread_info[i].state = THREAD_STOPPED; pthread_mutex_init(&thread_info[i].lock, NULL); // 初始化每个线程的锁 } // 创建初始线程组 for (int i = 0; i < THREAD_CONNECTIONS; i++) { int* index = (int*)malloc(sizeof(int)); *index = i; if (i == 0) { // 客户端管理线程 if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) { printf("Failed to create client manager thread %d\n", i); free(index); } } else if (i == 1) { // 消息处理线程 if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) { printf("Failed to create message processor thread %d\n", i); free(index); } } else if (i == 2){ //接口和mq char* argv[] = { (char*)index,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; ThreadArgs* args = new ThreadArgs{3, argv}; if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { printf("Failed to create message processor thread %d\n", i); delete args; // 如果线程没创建成功就手动释放 free(index); } } else { // 其他工作线程 // 这里简化为空,实际应用中可添加其他线程 free(index); } } printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS); // 主监控循环 while (1) { sleep(MONITOR_INTERVAL); // 检查所有线程状态 for (int i = 0; i < THREAD_CONNECTIONS; i++) { pthread_mutex_lock(&thread_info[i].lock); // 检测运行中线程是否崩溃 if (thread_info[i].state == THREAD_RUNNING && !is_thread_alive(thread_info[i].tid)) { printf("Thread %d crashed unexpectedly\n", i); thread_info[i].state = THREAD_CRASHED; } // 处理需要重启的线程 if (thread_info[i].state == THREAD_STOPPED || thread_info[i].state == THREAD_CRASHED) { pthread_mutex_unlock(&thread_info[i].lock); restart_thread(i); // 异步重启 } else { pthread_mutex_unlock(&thread_info[i].lock); } } // 监控socket队列状态 static int queue_monitor = 0; //static int count = 3; if (++queue_monitor >= 10) { // 每10秒报告一次 printf("Message queue size: %zu\n", message_queue.size()); queue_monitor = 0; /*std::vector test_devices = generate_test_devices(count); count++; for (const auto& device : test_devices) { ClientManager::instance().add_device(device); } for (const auto& device : test_devices) { ClientManager::instance().remove_device("D001"); }*/ } } // 清理资源(理论上不会执行到这里) for (int i = 0; i < THREAD_CONNECTIONS; i++) { pthread_mutex_destroy(&thread_info[i].lock); } return 0; }