#include #include #include #include #include #include #include #include "PQSMsg.h" #include "client2.h" // 配置参数 #define INITIAL_DATA_SIZE 128 // 初始数据0.1KB #define CONNECTIONS 5 // 并发连接数 设备连接数 #define SERVER_IP "101.132.39.45" // 目标服务器IP 阿里云服务器 "101.132.39.45""172.27.208.1" #define SERVER_PORT 1056 // 目标服务器端口 #define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms) #define MAX_RECONNECT_DELAY 60000 // 最大重连延迟(ms) #define MAX_RECONNECT_ATTEMPTS 10 // 最大重连次数 /* 缓冲区分配回调 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void* buffer = malloc(suggested_size); if (!buffer) { fprintf(stderr, "Memory allocation failed\n"); *buf = uv_buf_init(NULL, 0); return; } *buf = uv_buf_init((char*)buffer, suggested_size); } /* 数据读取回调 */ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { client_context_t* ctx = (client_context_t*)stream->data; if (nread < 0) { if (nread != UV_EOF) { fprintf(stdout, "[Client %d] RECV ERROR: %s\n", ctx->index, uv_strerror(nread)); } else { fprintf(stdout, "[Client %d] Connection closed by server\n", ctx->index); } uv_close((uv_handle_t*)stream, on_close); free(buf->base); } else if (nread > 0) { fprintf(stdout, "[Client %d] RECV %zd bytes data\n", ctx->index, nread); //数据传入客户端缓冲区并尝试取出合法报文格式的数据 free(buf->base); } } /* 数据写入回调 */ void on_write(uv_write_t* req, int status) { client_context_t* ctx = (client_context_t*)req->handle->data; if (status < 0) { fprintf(stdout, "[Client %d] SEND ERROR: %s\n", ctx->index, uv_strerror(status)); } else { fprintf(stdout, "[Client %d] SEND bytes success\n", ctx->index); } free(req->data); free(req); } /* 定时组装二进制报文并发送 */ void on_timer(uv_timer_t* handle) { client_context_t* ctx = (client_context_t*)handle->data; if (ctx->state != STATE_CONNECTED) { fprintf(stdout, "[Client %d] Skip sending: Not connected\n", ctx->index); return; } fprintf(stdout, "[Client %d] Preparing periodic data...\n", ctx->index); // 生成完整报文 装置云服务登录报文 auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6"); // 转换为数组形式 unsigned char* binary_array = binary_data.data(); size_t data_size = binary_data.size(); // 此处可调用发送函数 send_binary_data(ctx, binary_array, data_size); } /* 发送初始数据块 - 修复版 */ void send_initial_data(client_context_t* ctx) { // 添加状态检查 if (ctx->state != STATE_CONNECTED) { fprintf(stderr, "[Client %d] Cannot send initial data: not connected\n", ctx->index); return; } // 修正为实际数据大小 (1KB) const size_t data_size = INITIAL_DATA_SIZE; // 使用实际值替换 INITIAL_DATA_SIZE char* buffer = (char*)malloc(data_size); if (!buffer) { fprintf(stderr, "[Client %d] Failed to allocate initial data buffer\n", ctx->index); return; } memset(buffer, 'A', data_size); buffer[0] = 0xeb; buffer[1] = 0x90; buffer[2] = 0xff; buffer[15] = 0x16; fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size); uv_buf_t buf = uv_buf_init(buffer, data_size); uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t)); if (!write_req) { free(buffer); fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index); return; } write_req->data = buffer; // 检查uv_write返回值 int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write); if (ret < 0) { fprintf(stderr, "[Client %d] uv_write failed: %s\n", ctx->index, uv_strerror(ret)); free(buffer); free(write_req); } } /* 发送二进制报文函数 */ void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size) { if (ctx->state != STATE_CONNECTED) { fprintf(stderr, "[Client %d] Cannot send binary data: not connected\n", ctx->index); return; } uv_buf_t buf = uv_buf_init((char*)data, data_size); uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t)); if (!write_req) { fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index); return; } write_req->data = NULL; // 不需要额外数据,因为data已经传入 fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size); int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write); if (ret < 0) { fprintf(stderr, "[Client %d] uv_write failed: %s\n", ctx->index, uv_strerror(ret)); free(write_req); } // 注意:这里不需要释放data,因为data是由调用者管理的 } /* 连接关闭回调 */ void on_close(uv_handle_t* handle) { client_context_t* ctx = (client_context_t*)handle->data; ctx->state = STATE_DISCONNECTED; fprintf(stdout, "[Client %d] Connection closed\n", ctx->index); // 停止数据读写定时器和重连定时器 uv_timer_stop(&ctx->timer); uv_timer_stop(&ctx->reconnect_timer); // 取消读操作(如果有的话) uv_read_stop((uv_stream_t*)&ctx->client); // 指数退避算法 int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts); delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay; fprintf(stdout, "[Client %d] Will reconnect after %dms (attempt %d/%d)\n", ctx->index, delay, ctx->reconnect_attempts + 1, MAX_RECONNECT_ATTEMPTS); uv_timer_start(&ctx->reconnect_timer, (uv_timer_cb)try_reconnect, delay, 0); ctx->reconnect_attempts++; } /* 尝试重连函数 */ void try_reconnect(uv_timer_t* timer) { client_context_t* ctx = (client_context_t*)timer->data; if (ctx->state != STATE_DISCONNECTED) { fprintf(stdout, "[Client %d] Reconnect skipped: Invalid state\n", ctx->index); return; } fprintf(stdout, "[Client %d] Attempting to reconnect...\n", ctx->index); if (ctx->reconnect_attempts >= MAX_RECONNECT_ATTEMPTS) { fprintf(stderr, "[Client %d] Max reconnect attempts reached\n", ctx->index); return; } // 重新初始化TCP句柄(重要!) uv_tcp_init(ctx->loop, &ctx->client); ctx->client.data = ctx; ctx->state = STATE_CONNECTING; struct sockaddr_in addr; uv_ip4_addr(SERVER_IP, SERVER_PORT, &addr); uv_connect_t* req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); req->data = ctx; int ret = uv_tcp_connect(req, &ctx->client, (const struct sockaddr*)&addr, on_connect); if (ret < 0) { fprintf(stderr, "[Client %d] Connect error: %s\n", ctx->index, uv_strerror(ret)); ctx->state = STATE_DISCONNECTED; free(req); uv_close((uv_handle_t*)&ctx->client, on_close); } } /* 连接建立回调 */ void on_connect(uv_connect_t* req, int status) { client_context_t* ctx = (client_context_t*)req->data; if (ctx->state != STATE_CONNECTING) { fprintf(stdout, "[Client %d] Connection callback with invalid state\n", ctx->index); free(req); return; } if (status < 0) { fprintf(stderr, "[Client %d] Connect failed: %s\n", ctx->index, uv_strerror(status)); ctx->state = STATE_DISCONNECTED; free(req); uv_close((uv_handle_t*)&ctx->client, on_close); return; } else { ctx->state = STATE_CONNECTED; ctx->reconnect_attempts = 0; fprintf(stdout, "[Client %d] Connection established\n", ctx->index); // 启动数据收发 uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read); //send_initial_data(ctx); // 启动定时器 uv_timer_start(&ctx->timer, on_timer, 6000, 6000); } free(req); } /* 客户端线程函数 */ void* run_client(void* arg) { int index = *(int*)arg; free(arg); // 初始化事件循环 uv_loop_t* loop = uv_loop_new(); client_context_t* ctx = (client_context_t*)malloc(sizeof(client_context_t)); memset(ctx, 0, sizeof(client_context_t)); // 初始化上下文 ctx->loop = loop; ctx->index = index; ctx->state = STATE_DISCONNECTED; ctx->reconnect_attempts = 0; // 初始化libuv句柄 uv_tcp_init(loop, &ctx->client); uv_timer_init(loop, &ctx->timer); uv_timer_init(loop, &ctx->reconnect_timer); // 设置关联数据 ctx->client.data = ctx; ctx->timer.data = ctx; ctx->reconnect_timer.data = ctx; // 首次连接尝试 try_reconnect(&ctx->reconnect_timer); // 运行事件循环 uv_run(loop, UV_RUN_DEFAULT); // 资源清理 uv_loop_close(loop); free(loop); free(ctx); return NULL; } /* 主函数 */ int main() { pthread_t threads[CONNECTIONS]; // 创建客户端线程 for (int i = 0; i < CONNECTIONS; i++) { int* index = (int*)malloc(sizeof(int)); *index = i; if (pthread_create(&threads[i], NULL, run_client, index) != 0) { fprintf(stderr, "Failed to create thread %d\n", i); free(index); } } printf("Started %d client connections\n", CONNECTIONS); while (1) { printf("sleep 1\n"); sleep(10); } // 等待所有线程结束 for (int i = 0; i < CONNECTIONS; i++) { pthread_join(threads[i], NULL); } return 0; }