diff --git a/LFtid1056/client2.cpp b/LFtid1056/client2.cpp index 5c0a128..5cf0faa 100644 --- a/LFtid1056/client2.cpp +++ b/LFtid1056/client2.cpp @@ -10,13 +10,17 @@ // 配置参数 #define INITIAL_DATA_SIZE 128 // 初始数据0.1KB -#define CONNECTIONS 5 // 并发连接数 设备连接数 +#define CONNECTIONS 2 // 并发连接数 设备连接数 #define SERVER_IP "101.132.39.45" // 目标服务器IP 阿里云服务器 "101.132.39.45""172.27.208.1" #define SERVER_PORT 1056 // 目标服务器端口 #define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms) #define MAX_RECONNECT_DELAY 60000 // 最大重连延迟(ms) #define MAX_RECONNECT_ATTEMPTS 10 // 最大重连次数 +// 全局变量用于管理客户端线程 +static pthread_t client_threads[CONNECTIONS]; +static client_context_t* client_contexts[CONNECTIONS] = { NULL }; + /* 缓冲区分配回调 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void* buffer = malloc(suggested_size); @@ -162,26 +166,28 @@ void on_close(uv_handle_t* handle) { uv_timer_stop(&ctx->timer); uv_timer_stop(&ctx->reconnect_timer); - // 取消读操作(如果有的话) + // 取消读操作 uv_read_stop((uv_stream_t*)&ctx->client); - // 指数退避算法 - int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts); - delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay; + // 如果不是主动关闭,尝试重连 + if (!ctx->shutdown) { + int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts); + delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay; - fprintf(stdout, "[Client %d] Will reconnect after %dms (attempt %d/%d)\n", - ctx->index, delay, ctx->reconnect_attempts + 1, MAX_RECONNECT_ATTEMPTS); + fprintf(stdout, "[Client %d] Will reconnect after %dms (attempt %d/%d)\n", + ctx->index, delay, ctx->reconnect_attempts + 1, MAX_RECONNECT_ATTEMPTS); - uv_timer_start(&ctx->reconnect_timer, (uv_timer_cb)try_reconnect, delay, 0); - ctx->reconnect_attempts++; + uv_timer_start(&ctx->reconnect_timer, (uv_timer_cb)try_reconnect, delay, 0); + ctx->reconnect_attempts++; + } } /* 尝试重连函数 */ void try_reconnect(uv_timer_t* timer) { client_context_t* ctx = (client_context_t*)timer->data; - if (ctx->state != STATE_DISCONNECTED) { - fprintf(stdout, "[Client %d] Reconnect skipped: Invalid state\n", ctx->index); + if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) { + fprintf(stdout, "[Client %d] Reconnect skipped: Invalid state or shutdown\n", ctx->index); return; } @@ -191,7 +197,7 @@ void try_reconnect(uv_timer_t* timer) { return; } - // 重新初始化TCP句柄(重要!) + // 重新初始化TCP句柄 uv_tcp_init(ctx->loop, &ctx->client); ctx->client.data = ctx; @@ -237,13 +243,12 @@ void on_connect(uv_connect_t* req, int status) { //send_initial_data(ctx); // 启动定时器 - uv_timer_start(&ctx->timer, on_timer, 6000, 6000); + //uv_timer_start(&ctx->timer, on_timer, 6000, 6000); } free(req); } - /* 客户端线程函数 */ void* run_client(void* arg) { int index = *(int*)arg; @@ -259,6 +264,7 @@ void* run_client(void* arg) { ctx->index = index; ctx->state = STATE_DISCONNECTED; ctx->reconnect_attempts = 0; + ctx->shutdown = 0; // 初始化关闭标志 // 初始化libuv句柄 uv_tcp_init(loop, &ctx->client); @@ -273,41 +279,124 @@ void* run_client(void* arg) { // 首次连接尝试 try_reconnect(&ctx->reconnect_timer); - // 运行事件循环 - uv_run(loop, UV_RUN_DEFAULT); + // 运行事件循环,定期检查关闭标志 + while (uv_loop_alive(loop) && !ctx->shutdown) { + uv_run(loop, UV_RUN_NOWAIT); + usleep(10000); // 10ms + } + fprintf(stdout, "[Client %d] thread closed\n", ctx->index); + // 主动关闭所有资源 + if (ctx->shutdown) { + uv_close((uv_handle_t*)&ctx->client, on_close); + uv_close((uv_handle_t*)&ctx->timer, NULL); + uv_close((uv_handle_t*)&ctx->reconnect_timer, NULL); + + // 确保所有关闭回调执行 + while (uv_loop_alive(loop)) { + uv_run(loop, UV_RUN_ONCE); + } + } // 资源清理 uv_loop_close(loop); - free(loop); + uv_loop_delete(loop); free(ctx); return NULL; } -/* 主函数 */ -int main() { - pthread_t threads[CONNECTIONS]; +/* 停止所有客户端线程 (新增函数) */ +void stop_all_clients() { + for (int i = 0; i < CONNECTIONS; i++) { + if (client_contexts[i]) { + client_contexts[i]->shutdown = 1; // 设置关闭标志 + } + } - // 创建客户端线程 + // 等待所有客户端线程退出 + for (int i = 0; i < CONNECTIONS; i++) { + if (client_threads[i]) { + pthread_join(client_threads[i], NULL); + client_threads[i] = 0; + } + } +} + +/* 启动所有客户端线程 (新增函数) */ +void start_all_clients() { for (int i = 0; i < CONNECTIONS; i++) { int* index = (int*)malloc(sizeof(int)); *index = i; - if (pthread_create(&threads[i], NULL, run_client, index) != 0) { - fprintf(stderr, "Failed to create thread %d\n", i); + if (pthread_create(&client_threads[i], NULL, run_client, index) != 0) { + fprintf(stderr, "Failed to create client thread %d\n", i); free(index); } + else { + // 上下文会在run_client中创建,这里初始化为NULL + client_contexts[i] = NULL; + } + } +} + +/* 获取活动客户端数量 (新增函数) */ +int get_active_client_count() { + int active_count = 0; + for (int i = 0; i < CONNECTIONS; i++) { + if (client_threads[i] && pthread_tryjoin_np(client_threads[i], NULL) == EBUSY) { + active_count++; + } + } + return active_count; +} + +/* 客户端连接监控函数 (新增函数) */ +void monitor_client_connections() { + static int monitor_counter = 0; + monitor_counter++; + + // 每5次调用(约5秒)检查一次状态 + if (monitor_counter >= 5) { + monitor_counter = 0; + + int active_clients = get_active_client_count(); + printf("Active client connections: %d/%d\n", active_clients, CONNECTIONS); + + // 如果所有客户端线程都退出了,自动重启 + if (active_clients == 0) { + printf("All clients stopped, restarting...\n"); + start_all_clients(); + } } - printf("Started %d client connections\n", CONNECTIONS); - - while (1) { - printf("sleep 1\n"); - sleep(10); - } - // 等待所有线程结束 - for (int i = 0; i < CONNECTIONS; i++) { - pthread_join(threads[i], NULL); - } - - return 0; + /*static int monitor_temp = 0; + monitor_temp++; + if (monitor_temp >= 30) { + monitor_temp = 0; + printf("30 second to stop all client\n"); + stop_all_clients(); + }*/ } +///* 主函数 */ +//int main() { +// pthread_t threads[CONNECTIONS]; +// +// // 创建客户端线程 +// for (int i = 0; i < CONNECTIONS; i++) { +// int* index = (int*)malloc(sizeof(int)); +// *index = i; +// +// if (pthread_create(&threads[i], NULL, run_client, index) != 0) { +// fprintf(stderr, "Failed to create thread %d\n", i); +// free(index); +// } +// } +// +// printf("Started %d client connections\n", CONNECTIONS); +// +// // 等待所有线程结束 +// for (int i = 0; i < CONNECTIONS; i++) { +// pthread_join(threads[i], NULL); +// } +// +// return 0; +//} diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index 85e6c69..5ce3a71 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -22,6 +22,7 @@ typedef struct { uv_timer_t reconnect_timer;// ʱ connection_state_t state; // ǰ״̬ int reconnect_attempts; // ǰ + volatile int shutdown; // رձ־ () } client_context_t; // @@ -29,3 +30,6 @@ void try_reconnect(uv_timer_t* timer);// void on_connect(uv_connect_t* req, int status);//ͻӻص void on_close(uv_handle_t* handle);//ͻ˶Ͽص void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size);//װƱIJ ͻӣݣݴС +void stop_all_clients(); // ֹͣпͻ +void start_all_clients(); // пͻ +void monitor_client_connections();//ؿͻ diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 1e2fcc2..11ebbfb 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -4,6 +4,8 @@ #include #include #include +#include "PQSMsg.h" +#include "client2.h" /* */ #define THREAD_CONNECTIONS 10 // ߳ @@ -61,33 +63,37 @@ void* work_thread(void* arg) { return NULL; } /* ̹߳ 1߳*/ -void* work_thread_1(void* arg) { - int index = *(int*)arg; // ȡ߳ - free(arg); // ͷŶ̬ڴ +/* ͻӹ̺߳*/ +void* client_manager_thread(void* arg) { + int index = *(int*)arg; + free(arg); // ߳״̬Ϊ pthread_mutex_lock(&thread_info[index].lock); - printf("work_thread_1 %d started\n", index); + printf("Client Manager Thread %d started\n", index); thread_info[index].state = THREAD_RUNNING; pthread_mutex_unlock(&thread_info[index].lock); - // 1߳ѭ - while (1) { - sleep(5); + // пͻ߳ + start_all_clients(); + printf("Started client connections\n"); - // 10%ģ̹߳ - if (rand() % 10 == 0) { - pthread_mutex_lock(&thread_info[index].lock); - printf("Thread %d simulated failure\n", index); - pthread_mutex_unlock(&thread_info[index].lock); - break; - } + // ѭ + while (thread_info[index].state == THREAD_RUNNING) { + sleep(1); + + // ؿͻ״̬ÿͻӲֵĺ + monitor_client_connections(); } + // ֹͣпͻ߳ + stop_all_clients(); + printf("Stopped all client connections\n"); + // ֹ߳ pthread_mutex_lock(&thread_info[index].lock); thread_info[index].state = THREAD_STOPPED; - printf("work_thread_1 %d stopped\n", index); + printf("Client Manager Thread %d stopped\n", index); pthread_mutex_unlock(&thread_info[index].lock); return NULL; } @@ -108,26 +114,19 @@ void restart_thread(int index) { *new_index = index; if (index == 0) { - //߳1 - if (pthread_create(&thread_info[index].tid, NULL, work_thread_1, new_index) != 0) { + // ͻ˹߳ + if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) { pthread_mutex_lock(&global_lock); - printf("Failed to restart work_thread_1 %d\n", index); + printf("Failed to restart client manager thread %d\n", index); thread_info[index].state = THREAD_CRASHED; pthread_mutex_unlock(&global_lock); free(new_index); } } else { - //̳߳ - if (pthread_create(&thread_info[index].tid, NULL, work_thread, new_index) != 0) { - pthread_mutex_lock(&global_lock); - printf("Failed to restart thread %d\n", index); - thread_info[index].state = THREAD_CRASHED; - pthread_mutex_unlock(&global_lock); - free(new_index); - } + // ߳ + // ΪգʵӦп߳ } - } /* ̴߳ */ @@ -136,7 +135,7 @@ int is_thread_alive(pthread_t tid) { } /* */ -int main1() { +int main() { srand(time(NULL)); // ʼ // ʼ߳ @@ -152,18 +151,16 @@ int main1() { *index = i; if (i == 0) { - //߳߳1 - if (pthread_create(&thread_info[i].tid, NULL, work_thread_1, index) != 0) { - printf("Failed to create work_thread_1 %d\n", i); + // ͻ˹߳ + if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) { + printf("Failed to create client manager thread %d\n", i); free(index); } } else { - //̳߳ - if (pthread_create(&thread_info[i].tid, NULL, work_thread, index) != 0) { - printf("Failed to create thread %d\n", i); - free(index); - } + // ߳ + // ΪգʵӦп߳ + free(index); } }