#include #include #include #include #include #include "PQSMsg.h" #include "client2.h" #include "dealMsg.h" // 配置参数 #define CONNECTIONS 10 // 支持1000个并发连接 #define SERVER_IP "101.132.39.45" // 目标服务器IP "101.132.39.45" #define SERVER_PORT 1056 // 目标服务器端口 #define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms) #define MAX_RECONNECT_DELAY 60000 // 最大重连延迟(ms) static uv_loop_t* global_loop; // 全局事件循环 static client_context_t client_contexts[CONNECTIONS]; // 客户端上下文数组 static uv_timer_t monitor_timer; // 连接监控定时器 extern SafeMessageQueue message_queue; /* 缓冲区分配回调 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void* buffer = malloc(suggested_size); if (!buffer) { *buf = uv_buf_init(NULL, 0); return; } *buf = uv_buf_init((char*)buffer, suggested_size); } /* 数据读取回调 */ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { client_context_t* ctx = (client_context_t*)stream->data; if (nread < 0) { if (nread != UV_EOF) { fprintf(stdout, "[Client %d] RECV ERROR: %s\n", ctx->index, uv_strerror(nread)); } uv_close((uv_handle_t*)stream, on_close); free(buf->base); return; } if (nread > 0) { // 将接收到的数据放入消息队列 deal_message_t msg; msg.client_index = ctx->index; msg.data = (char*)malloc(nread); msg.length = nread; memcpy(msg.data, buf->base, nread); if (!message_queue.push(msg)) { fprintf(stderr, "[Client %d] Message queue full, dropping message\n", ctx->index); free(msg.data); } } free(buf->base); } /* 数据写入回调 */ void on_write(uv_write_t* req, int status) { client_context_t* ctx = (client_context_t*)req->handle->data; if (status < 0) { fprintf(stdout, "[Client %d] SEND ERROR: %s\n", ctx->index, uv_strerror(status)); } free(req->data); // 释放发送数据缓冲区 free(req); // 释放写入请求 } /* 定时发送回调 */ void on_timer(uv_timer_t* handle) { client_context_t* ctx = (client_context_t*)handle->data; if (ctx->state != STATE_CONNECTED) { return; } //单个装置定时消息收发机制 // 生成完整报文 装置云服务登录报文 auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6"); // 转换为数组形式 unsigned char* binary_array = binary_data.data(); size_t data_size = binary_data.size(); // 此处可调用发送函数 send_binary_data(ctx, binary_array, data_size); } /* 发送二进制报文函数 */ void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size) { if (ctx->state != STATE_CONNECTED) { fprintf(stderr, "[Client %d] Cannot send binary data: not connected\n", ctx->index); return; } uv_buf_t buf = uv_buf_init((char*)data, data_size); uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t)); if (!write_req) { fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index); return; } write_req->data = NULL; // 不需要额外数据,因为data已经传入 //fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size); int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write); if (ret < 0) { fprintf(stderr, "[Client %d] uv_write failed: %s\n", ctx->index, uv_strerror(ret)); free(write_req); } // 注意:这里不需要释放data,因为data是由调用者管理的 } /* 连接关闭回调 */ void on_close(uv_handle_t* handle) { client_context_t* ctx = (client_context_t*)handle->data; ctx->state = STATE_DISCONNECTED; fprintf(stderr, "[Client %d] closed\n", ctx->index); // 停止定时器 uv_timer_stop(&ctx->timer); uv_timer_stop(&ctx->reconnect_timer); // 自动重连逻辑 if (!ctx->shutdown) { int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts); delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay; fprintf(stdout, "[Client %d] Reconnecting in %dms (attempt %d)\n", ctx->index, delay, ctx->reconnect_attempts + 1); ctx->reconnect_attempts++; uv_timer_start(&ctx->reconnect_timer, try_reconnect, delay, 0); } } /* 尝试重连 */ void try_reconnect(uv_timer_t* timer) { client_context_t* ctx = (client_context_t*)timer->data; if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) { return; } fprintf(stderr, "[Client %d] try_reconnect\n", ctx->index); // 重新初始化TCP句柄 uv_tcp_init(ctx->loop, &ctx->client); ctx->client.data = ctx; ctx->state = STATE_CONNECTING; struct sockaddr_in addr; uv_ip4_addr(SERVER_IP, SERVER_PORT, &addr); uv_connect_t* req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); req->data = ctx; int ret = uv_tcp_connect(req, &ctx->client, (const struct sockaddr*)&addr, on_connect); if (ret < 0) { fprintf(stderr, "[Client %d] Connect error: %s\n", ctx->index, uv_strerror(ret)); free(req); uv_close((uv_handle_t*)&ctx->client, on_close); } } /* 连接建立回调 */ void on_connect(uv_connect_t* req, int status) { client_context_t* ctx = (client_context_t*)req->data; if (status < 0) { fprintf(stderr, "[Client %d] Connect failed: %s\n", ctx->index, uv_strerror(status)); // 直接关闭句柄,避免后续重复关闭 if (!uv_is_closing((uv_handle_t*)&ctx->client)) { uv_close((uv_handle_t*)&ctx->client, NULL); } free(req); return; } fprintf(stderr, "[Client %d] on_connect\n", ctx->index); ctx->state = STATE_CONNECTED; ctx->reconnect_attempts = 0; // 启动数据接收 uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read); // 启动定时发送 uv_timer_start(&ctx->timer, on_timer, 6000, 6000); free(req); } /* 初始化所有客户端连接 */ void init_clients(uv_loop_t* loop) { for (int i = 0; i < CONNECTIONS; i++) { client_context_t* ctx = &client_contexts[i]; memset(ctx, 0, sizeof(client_context_t)); ctx->loop = loop; ctx->index = i; ctx->state = STATE_DISCONNECTED; // 初始化TCP句柄 uv_tcp_init(loop, &ctx->client); ctx->client.data = ctx; // 初始化定时器 uv_timer_init(loop, &ctx->timer); ctx->timer.data = ctx; // 初始化重连定时器 uv_timer_init(loop, &ctx->reconnect_timer); ctx->reconnect_timer.data = ctx; // 首次连接 try_reconnect(&ctx->reconnect_timer); } } /* 停止所有客户端 */ void stop_all_clients() { for (int i = 0; i < CONNECTIONS; i++) { client_context_t* ctx = &client_contexts[i]; ctx->shutdown = 1; // 关闭所有句柄 if (!uv_is_closing((uv_handle_t*)&ctx->client)) { uv_close((uv_handle_t*)&ctx->client, NULL); } if (!uv_is_closing((uv_handle_t*)&ctx->timer)) { uv_close((uv_handle_t*)&ctx->timer, NULL); } if (!uv_is_closing((uv_handle_t*)&ctx->reconnect_timer)) { uv_close((uv_handle_t*)&ctx->reconnect_timer, NULL); } } } /* 连接监控回调 */ void monitor_connections(uv_timer_t* handle) { // 自动恢复断开的连接 static int recovery_counter = 0; if (++recovery_counter >= 5) { // 每5次监控执行一次恢复 int active_count = 0; for (int i = 0; i < CONNECTIONS; i++) { if (client_contexts[i].state == STATE_CONNECTED) { active_count++; } } printf("Active connections: %d/%d\n", active_count, CONNECTIONS); recovery_counter = 0; } //static int monitor_temp = 0; //monitor_temp++; //if (monitor_temp >= 30) { // monitor_temp = 0; // printf("30 second to stop all client\n"); // // 停止并关闭监控定时器 // uv_timer_stop(handle); // uv_close((uv_handle_t*)handle, NULL); // // // 停止所有客户端 // stop_all_clients(); // // // 停止事件循环 // uv_stop(global_loop); //} } static void close_walk_cb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { fprintf(stderr, "Force closing leaked handle: %p (type=%d)\n", handle, handle->type); uv_close(handle, NULL); } } void start_client_connect() { // 创建全局事件循环 global_loop = uv_default_loop(); // 初始化所有客户端 init_clients(global_loop); // 启动连接监控 uv_timer_init(global_loop, &monitor_timer); uv_timer_start(&monitor_timer, monitor_connections, 1000, 1000); // 运行事件循环 uv_run(global_loop, UV_RUN_DEFAULT); // 添加资源清理阶段 while (uv_loop_alive(global_loop)) { uv_run(global_loop, UV_RUN_ONCE); } // 安全关闭事件循环 int err = uv_loop_close(global_loop); if (err) { fprintf(stderr, "uv_loop_close error: %s\n", uv_strerror(err)); // 强制清理残留句柄(调试用) uv_walk(global_loop, close_walk_cb, NULL); uv_run(global_loop, UV_RUN_NOWAIT); } global_loop = NULL; }