#include #include #include #include #include #include #include #include "PQSMsg.h" #include "client2.h" // 配置参数 #define INITIAL_DATA_SIZE 128 // 初始数据0.1KB #define CONNECTIONS 2 // 并发连接数 设备连接数 #define SERVER_IP "101.132.39.45" // 目标服务器IP 阿里云服务器 "101.132.39.45""172.27.208.1" #define SERVER_PORT 1056 // 目标服务器端口 #define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms) #define MAX_RECONNECT_DELAY 60000 // 最大重连延迟(ms) #define MAX_RECONNECT_ATTEMPTS 10 // 最大重连次数 // 全局变量用于管理客户端线程 static pthread_t client_threads[CONNECTIONS]; static client_context_t* client_contexts[CONNECTIONS] = { NULL }; /* 缓冲区分配回调 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void* buffer = malloc(suggested_size); if (!buffer) { fprintf(stderr, "Memory allocation failed\n"); *buf = uv_buf_init(NULL, 0); return; } *buf = uv_buf_init((char*)buffer, suggested_size); } /* 数据读取回调 */ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { client_context_t* ctx = (client_context_t*)stream->data; if (nread < 0) { if (nread != UV_EOF) { fprintf(stdout, "[Client %d] RECV ERROR: %s\n", ctx->index, uv_strerror(nread)); } else { fprintf(stdout, "[Client %d] Connection closed by server\n", ctx->index); } uv_close((uv_handle_t*)stream, on_close); free(buf->base); } else if (nread > 0) { fprintf(stdout, "[Client %d] RECV %zd bytes data\n", ctx->index, nread); //数据传入客户端缓冲区并尝试取出合法报文格式的数据 free(buf->base); } } /* 数据写入回调 */ void on_write(uv_write_t* req, int status) { client_context_t* ctx = (client_context_t*)req->handle->data; if (status < 0) { fprintf(stdout, "[Client %d] SEND ERROR: %s\n", ctx->index, uv_strerror(status)); } else { fprintf(stdout, "[Client %d] SEND bytes success\n", ctx->index); } free(req->data); free(req); } /* 定时组装二进制报文并发送 */ void on_timer(uv_timer_t* handle) { client_context_t* ctx = (client_context_t*)handle->data; if (ctx->state != STATE_CONNECTED) { fprintf(stdout, "[Client %d] Skip sending: Not connected\n", ctx->index); return; } fprintf(stdout, "[Client %d] Preparing periodic data...\n", ctx->index); // 生成完整报文 装置云服务登录报文 auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6"); // 转换为数组形式 unsigned char* binary_array = binary_data.data(); size_t data_size = binary_data.size(); // 此处可调用发送函数 send_binary_data(ctx, binary_array, data_size); } /* 发送初始数据块 - 修复版 */ void send_initial_data(client_context_t* ctx) { // 添加状态检查 if (ctx->state != STATE_CONNECTED) { fprintf(stderr, "[Client %d] Cannot send initial data: not connected\n", ctx->index); return; } // 修正为实际数据大小 (1KB) const size_t data_size = INITIAL_DATA_SIZE; // 使用实际值替换 INITIAL_DATA_SIZE char* buffer = (char*)malloc(data_size); if (!buffer) { fprintf(stderr, "[Client %d] Failed to allocate initial data buffer\n", ctx->index); return; } memset(buffer, 'A', data_size); buffer[0] = 0xeb; buffer[1] = 0x90; buffer[2] = 0xff; buffer[15] = 0x16; fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size); uv_buf_t buf = uv_buf_init(buffer, data_size); uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t)); if (!write_req) { free(buffer); fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index); return; } write_req->data = buffer; // 检查uv_write返回值 int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write); if (ret < 0) { fprintf(stderr, "[Client %d] uv_write failed: %s\n", ctx->index, uv_strerror(ret)); free(buffer); free(write_req); } } /* 发送二进制报文函数 */ void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size) { if (ctx->state != STATE_CONNECTED) { fprintf(stderr, "[Client %d] Cannot send binary data: not connected\n", ctx->index); return; } uv_buf_t buf = uv_buf_init((char*)data, data_size); uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t)); if (!write_req) { fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index); return; } write_req->data = NULL; // 不需要额外数据,因为data已经传入 fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size); int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write); if (ret < 0) { fprintf(stderr, "[Client %d] uv_write failed: %s\n", ctx->index, uv_strerror(ret)); free(write_req); } // 注意:这里不需要释放data,因为data是由调用者管理的 } /* 连接关闭回调 */ void on_close(uv_handle_t* handle) { client_context_t* ctx = (client_context_t*)handle->data; ctx->state = STATE_DISCONNECTED; fprintf(stdout, "[Client %d] Connection closed\n", ctx->index); // 停止数据读写定时器和重连定时器 uv_timer_stop(&ctx->timer); uv_timer_stop(&ctx->reconnect_timer); // 取消读操作 uv_read_stop((uv_stream_t*)&ctx->client); // 如果不是主动关闭,尝试重连 if (!ctx->shutdown) { int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts); delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay; fprintf(stdout, "[Client %d] Will reconnect after %dms (attempt %d/%d)\n", ctx->index, delay, ctx->reconnect_attempts + 1, MAX_RECONNECT_ATTEMPTS); uv_timer_start(&ctx->reconnect_timer, (uv_timer_cb)try_reconnect, delay, 0); ctx->reconnect_attempts++; } } /* 尝试重连函数 */ void try_reconnect(uv_timer_t* timer) { client_context_t* ctx = (client_context_t*)timer->data; if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) { fprintf(stdout, "[Client %d] Reconnect skipped: Invalid state or shutdown\n", ctx->index); return; } fprintf(stdout, "[Client %d] Attempting to reconnect...\n", ctx->index); if (ctx->reconnect_attempts >= MAX_RECONNECT_ATTEMPTS) { fprintf(stderr, "[Client %d] Max reconnect attempts reached\n", ctx->index); return; } // 重新初始化TCP句柄 uv_tcp_init(ctx->loop, &ctx->client); ctx->client.data = ctx; ctx->state = STATE_CONNECTING; struct sockaddr_in addr; uv_ip4_addr(SERVER_IP, SERVER_PORT, &addr); uv_connect_t* req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); req->data = ctx; int ret = uv_tcp_connect(req, &ctx->client, (const struct sockaddr*)&addr, on_connect); if (ret < 0) { fprintf(stderr, "[Client %d] Connect error: %s\n", ctx->index, uv_strerror(ret)); ctx->state = STATE_DISCONNECTED; free(req); uv_close((uv_handle_t*)&ctx->client, on_close); } } /* 连接建立回调 */ void on_connect(uv_connect_t* req, int status) { client_context_t* ctx = (client_context_t*)req->data; if (ctx->state != STATE_CONNECTING) { fprintf(stdout, "[Client %d] Connection callback with invalid state\n", ctx->index); free(req); return; } if (status < 0) { fprintf(stderr, "[Client %d] Connect failed: %s\n", ctx->index, uv_strerror(status)); ctx->state = STATE_DISCONNECTED; free(req); uv_close((uv_handle_t*)&ctx->client, on_close); return; } else { ctx->state = STATE_CONNECTED; ctx->reconnect_attempts = 0; fprintf(stdout, "[Client %d] Connection established\n", ctx->index); // 启动数据收发 uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read); //send_initial_data(ctx); // 启动定时器 //uv_timer_start(&ctx->timer, on_timer, 6000, 6000); } free(req); } /* 客户端线程函数 */ void* run_client(void* arg) { int index = *(int*)arg; free(arg); // 初始化事件循环 uv_loop_t* loop = uv_loop_new(); client_context_t* ctx = (client_context_t*)malloc(sizeof(client_context_t)); memset(ctx, 0, sizeof(client_context_t)); // 初始化上下文 ctx->loop = loop; ctx->index = index; ctx->state = STATE_DISCONNECTED; ctx->reconnect_attempts = 0; ctx->shutdown = 0; // 初始化关闭标志 // 初始化libuv句柄 uv_tcp_init(loop, &ctx->client); uv_timer_init(loop, &ctx->timer); uv_timer_init(loop, &ctx->reconnect_timer); // 设置关联数据 ctx->client.data = ctx; ctx->timer.data = ctx; ctx->reconnect_timer.data = ctx; // 首次连接尝试 try_reconnect(&ctx->reconnect_timer); // 运行事件循环,定期检查关闭标志 while (uv_loop_alive(loop) && !ctx->shutdown) { uv_run(loop, UV_RUN_NOWAIT); usleep(10000); // 10ms } fprintf(stdout, "[Client %d] thread closed\n", ctx->index); // 主动关闭所有资源 if (ctx->shutdown) { uv_close((uv_handle_t*)&ctx->client, on_close); uv_close((uv_handle_t*)&ctx->timer, NULL); uv_close((uv_handle_t*)&ctx->reconnect_timer, NULL); // 确保所有关闭回调执行 while (uv_loop_alive(loop)) { uv_run(loop, UV_RUN_ONCE); } } // 资源清理 uv_loop_close(loop); uv_loop_delete(loop); free(ctx); return NULL; } /* 停止所有客户端线程 (新增函数) */ void stop_all_clients() { for (int i = 0; i < CONNECTIONS; i++) { if (client_contexts[i]) { client_contexts[i]->shutdown = 1; // 设置关闭标志 } } // 等待所有客户端线程退出 for (int i = 0; i < CONNECTIONS; i++) { if (client_threads[i]) { pthread_join(client_threads[i], NULL); client_threads[i] = 0; } } } /* 启动所有客户端线程 (新增函数) */ void start_all_clients() { for (int i = 0; i < CONNECTIONS; i++) { int* index = (int*)malloc(sizeof(int)); *index = i; if (pthread_create(&client_threads[i], NULL, run_client, index) != 0) { fprintf(stderr, "Failed to create client thread %d\n", i); free(index); } else { // 上下文会在run_client中创建,这里初始化为NULL client_contexts[i] = NULL; } } } /* 获取活动客户端数量 (新增函数) */ int get_active_client_count() { int active_count = 0; for (int i = 0; i < CONNECTIONS; i++) { if (client_threads[i] && pthread_tryjoin_np(client_threads[i], NULL) == EBUSY) { active_count++; } } return active_count; } /* 客户端连接监控函数 (新增函数) */ void monitor_client_connections() { static int monitor_counter = 0; monitor_counter++; // 每5次调用(约5秒)检查一次状态 if (monitor_counter >= 5) { monitor_counter = 0; int active_clients = get_active_client_count(); printf("Active client connections: %d/%d\n", active_clients, CONNECTIONS); // 如果所有客户端线程都退出了,自动重启 if (active_clients == 0) { printf("All clients stopped, restarting...\n"); start_all_clients(); } } /*static int monitor_temp = 0; monitor_temp++; if (monitor_temp >= 30) { monitor_temp = 0; printf("30 second to stop all client\n"); stop_all_clients(); }*/ } ///* 主函数 */ //int main() { // pthread_t threads[CONNECTIONS]; // // // 创建客户端线程 // for (int i = 0; i < CONNECTIONS; i++) { // int* index = (int*)malloc(sizeof(int)); // *index = i; // // if (pthread_create(&threads[i], NULL, run_client, index) != 0) { // fprintf(stderr, "Failed to create thread %d\n", i); // free(index); // } // } // // printf("Started %d client connections\n", CONNECTIONS); // // // 等待所有线程结束 // for (int i = 0; i < CONNECTIONS; i++) { // pthread_join(threads[i], NULL); // } // // return 0; //}