#include "client2.h" #include "PQSMsg.h" #include "dealMsg.h" #include #include #include #include #include #include // 配置参数 constexpr int BASE_RECONNECT_DELAY = 5000; // 基础重连延迟(ms) constexpr int MAX_RECONNECT_DELAY = 60000; // 最大重连延迟(ms) constexpr const char* SERVER_IP = "101.132.39.45"; // 目标服务器IP constexpr int SERVER_PORT = 1056; // 目标服务器端口 static uv_loop_t* global_loop = nullptr; static std::vector> client_contexts; static uv_timer_t monitor_timer; extern SafeMessageQueue message_queue; // ClientContext 实现 ClientContext::ClientContext(uv_loop_t* loop, const DeviceInfo& device, int index) : loop(loop), state(ConnectionState::DISCONNECTED), reconnect_attempts(0), shutdown(false), device_info(device), index_(index) { // 初始化 TCP 句柄 uv_tcp_init(loop, &client); client.data = this; // 初始化定时器 uv_timer_init(loop, &timer); timer.data = this; // 初始化重连定时器 uv_timer_init(loop, &reconnect_timer); reconnect_timer.data = this; } ClientContext::~ClientContext() { stop_timers(); close_handles(); } void ClientContext::init_tcp() { if (!uv_is_active((uv_handle_t*)&client)) { uv_tcp_init(loop, &client); client.data = this; } } void ClientContext::start_timer() { if (!uv_is_active((uv_handle_t*)&timer)) { uv_timer_start(&timer, on_timer, 6000, 6000); } } void ClientContext::start_reconnect_timer(int delay) { if (!uv_is_active((uv_handle_t*)&reconnect_timer)) { uv_timer_start(&reconnect_timer, try_reconnect, delay, 0); } } void ClientContext::stop_timers() { if (uv_is_active((uv_handle_t*)&timer)) uv_timer_stop(&timer); if (uv_is_active((uv_handle_t*)&reconnect_timer)) uv_timer_stop(&reconnect_timer); } void ClientContext::close_handles() { if (!uv_is_closing((uv_handle_t*)&client)) { uv_close((uv_handle_t*)&client, nullptr); } if (!uv_is_closing((uv_handle_t*)&timer)) { uv_close((uv_handle_t*)&timer, nullptr); } if (!uv_is_closing((uv_handle_t*)&reconnect_timer)) { uv_close((uv_handle_t*)&reconnect_timer, nullptr); } } /* 缓冲区分配回调 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = new char[suggested_size]; buf->len = suggested_size; } /* 数据读取回调 */ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { ClientContext* ctx = static_cast(stream->data); if (nread < 0) { if (nread != UV_EOF) { std::cerr << "[Device " << ctx->device_info.device_id << "] RECV ERROR: " << uv_strerror(nread) << std::endl; } uv_close((uv_handle_t*)stream, on_close); delete[] buf->base; return; } if (nread > 0) { // 将接收到的数据放入消息队列 deal_message_t msg; msg.device_id = ctx->device_info.device_id; // 直接赋值 msg.mac = ctx->device_info.mac; // 直接赋值 // 复制测点信息 msg.points = ctx->device_info.points; msg.data = new char[nread]; msg.length = nread; memcpy(msg.data, buf->base, nread); if (!message_queue.push(msg)) { std::cerr << "[Device " << ctx->device_info.device_id << "] Message queue full, dropping message" << std::endl; delete[] msg.data; } } delete[] buf->base; } /* 数据写入回调 */ void on_write(uv_write_t* req, int status) { ClientContext* ctx = static_cast(req->handle->data); if (status < 0) { std::cerr << "[Device " << ctx->device_info.device_id << "] SEND ERROR: " << uv_strerror(status) << std::endl; } delete[] static_cast(req->data); // 释放发送数据缓冲区 delete req; // 释放写入请求 } /* 定时发送回调 */ void on_timer(uv_timer_t* handle) { ClientContext* ctx = static_cast(handle->data); if (ctx->state != ConnectionState::CONNECTED) { return; } // 使用装置自己的MAC地址生成登录报文 auto binary_data = generate_frontlogin_message(ctx->device_info.mac); // 调用发送函数 send_binary_data(ctx, binary_data.data(), binary_data.size()); // 根据装置状态发送其他数据 if (ctx->device_info.status == 1) { // 在线状态 // 可以发送装置配置信息或测点数据 } } /* 发送二进制报文函数 */ void send_binary_data(ClientContext* ctx, const unsigned char* data, size_t data_size) { if (ctx->state != ConnectionState::CONNECTED) { std::cerr << "[Device " << ctx->device_info.device_id << "] Cannot send: not connected" << std::endl; return; } uv_buf_t buf = uv_buf_init(const_cast(reinterpret_cast(data)), data_size); uv_write_t* write_req = new uv_write_t; // 复制数据以确保安全 char* data_copy = new char[data_size]; memcpy(data_copy, data, data_size); write_req->data = data_copy; int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write); if (ret < 0) { std::cerr << "[Device " << ctx->device_info.device_id << "] uv_write failed: " << uv_strerror(ret) << std::endl; delete[] data_copy; delete write_req; } } /* 连接关闭回调 */ void on_close(uv_handle_t* handle) { ClientContext* ctx = static_cast(handle->data); ctx->state = ConnectionState::DISCONNECTED; std::cerr << "[Device " << ctx->device_info.device_id << "] Connection closed" << std::endl; ctx->stop_timers(); if (!ctx->shutdown) { int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts); delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay; std::cout << "[Device " << ctx->device_info.device_id << "] Reconnecting in " << delay << "ms (attempt " << ctx->reconnect_attempts + 1 << ")" << std::endl; ctx->reconnect_attempts++; ctx->start_reconnect_timer(delay); } } /* 尝试重连 */ void try_reconnect(uv_timer_t* timer) { ClientContext* ctx = static_cast(timer->data); if (ctx->state != ConnectionState::DISCONNECTED || ctx->shutdown) { return; } std::cerr << "[Device " << ctx->device_info.device_id << "] Attempting reconnect" << std::endl; ctx->init_tcp(); ctx->state = ConnectionState::CONNECTING; struct sockaddr_in addr; uv_ip4_addr(SERVER_IP, SERVER_PORT, &addr); uv_connect_t* req = new uv_connect_t; req->data = ctx; int ret = uv_tcp_connect(req, &ctx->client, (const struct sockaddr*)&addr, on_connect); if (ret < 0) { std::cerr << "[Device " << ctx->device_info.device_id << "] Connect error: " << uv_strerror(ret) << std::endl; delete req; uv_close((uv_handle_t*)&ctx->client, on_close); } } /* 连接建立回调 */ void on_connect(uv_connect_t* req, int status) { ClientContext* ctx = static_cast(req->data); delete req; if (status < 0) { std::cerr << "[Device " << ctx->device_info.device_id << "] Connect failed: " << uv_strerror(status) << std::endl; if (!uv_is_closing((uv_handle_t*)&ctx->client)) { uv_close((uv_handle_t*)&ctx->client, on_close); } return; } std::cerr << "[Device " << ctx->device_info.device_id << "] Connected to server" << std::endl; ctx->state = ConnectionState::CONNECTED; ctx->reconnect_attempts = 0; uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read); ctx->start_timer(); } /* 初始化所有客户端连接 */ void init_clients(uv_loop_t* loop, const std::vector& devices) { client_contexts.clear(); for (size_t i = 0; i < devices.size(); i++) { // 修改为C++11兼容的unique_ptr创建方式 client_contexts.push_back( std::unique_ptr( new ClientContext(loop, devices[i], i) ) ); try_reconnect(&client_contexts.back()->reconnect_timer); } } /* 停止所有客户端 */ void stop_all_clients() { for (auto& ctx : client_contexts) { ctx->shutdown = true; ctx->close_handles(); } client_contexts.clear(); } /* 连接监控回调 */ void monitor_connections(uv_timer_t* handle) { static int recovery_counter = 0; if (++recovery_counter >= 5) { int active_count = 0; for (const auto& ctx : client_contexts) { if (ctx->state == ConnectionState::CONNECTED) { active_count++; } } std::cout << "Active connections: " << active_count << "/" << client_contexts.size() << std::endl; recovery_counter = 0; } } static void close_walk_cb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { uv_close(handle, nullptr); } } /* 启动客户端连接 */ void start_client_connect(const std::vector& devices) { // 创建全局事件循环 global_loop = uv_default_loop(); // 初始化所有客户端 init_clients(global_loop, devices); // 启动连接监控 uv_timer_init(global_loop, &monitor_timer); uv_timer_start(&monitor_timer, monitor_connections, 1000, 1000); // 运行事件循环 uv_run(global_loop, UV_RUN_DEFAULT); // 添加资源清理阶段 while (uv_loop_alive(global_loop)) { uv_run(global_loop, UV_RUN_ONCE); } // 安全关闭事件循环 int err = uv_loop_close(global_loop); if (err) { std::cerr << "uv_loop_close error: " << uv_strerror(err) << std::endl; // 强制清理残留句柄(调试用) uv_walk(global_loop, close_walk_cb, nullptr); uv_run(global_loop, UV_RUN_NOWAIT); } // 清理所有客户端 stop_all_clients(); global_loop = nullptr; }