From 3b4a4704dbdda655303655d23b231116af050b09 Mon Sep 17 00:00:00 2001 From: zw <3466561528@qq.com> Date: Wed, 25 Jun 2025 10:54:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86socket=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=A4=84=E7=90=86=EF=BC=8C=E5=A4=84=E7=90=86=E7=B2=98?= =?UTF-8?q?=E5=8C=85=E5=92=8C=E5=88=86=E5=8C=85=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LFtid1056/client2.cpp | 133 ++++++++++++++++++++++++++++++++------ LFtid1056/client2.h | 7 ++ LFtid1056/dealMsg.cpp | 6 +- LFtid1056/dealMsg.h | 2 +- LFtid1056/main_thread.cpp | 4 +- 5 files changed, 127 insertions(+), 25 deletions(-) diff --git a/LFtid1056/client2.cpp b/LFtid1056/client2.cpp index 1a91f5e..ef43dd2 100644 --- a/LFtid1056/client2.cpp +++ b/LFtid1056/client2.cpp @@ -13,8 +13,8 @@ // 配置参数 constexpr int BASE_RECONNECT_DELAY = 5000; // 基础重连延迟(ms) constexpr int MAX_RECONNECT_DELAY = 60000; // 最大重连延迟(ms) -constexpr const char* SERVER_IP = "172.25.144.1"; // 目标服务器IP"101.132.39.45" -constexpr int SERVER_PORT = 61000; // 目标服务器端口1056 +constexpr const char* SERVER_IP = "101.132.39.45"; // 目标服务器IP"101.132.39.45" +constexpr int SERVER_PORT = 1056; // 目标服务器端口1056 static uv_loop_t* global_loop = nullptr; static uv_timer_t monitor_timer; @@ -25,6 +25,8 @@ ClientContext::ClientContext(uv_loop_t* loop, const DeviceInfo& device, int inde : loop(loop), state(ConnectionState::DISCONNECTED), reconnect_attempts(0), shutdown(false), device_info(device), index_(index) { + recv_buffer_.reserve(4096); // 预分配4KB缓冲区 + // 初始化 TCP 句柄 uv_tcp_init(loop, &client); client.data = this; @@ -79,6 +81,112 @@ void ClientContext::close_handles() { } } +// 添加接收数据到缓冲区并处理 +void ClientContext::append_and_process_data(const char* data, size_t len) { + std::lock_guard lock(buffer_mutex_); + + // 添加到缓冲区 + recv_buffer_.insert(recv_buffer_.end(), data, data + len); + + // 处理缓冲区数据 + process_buffer(); + + // 检查缓冲区大小防止内存溢出 + if (recv_buffer_.size() > 10 * 1024 * 1024) { // 10MB限制 + recv_buffer_.clear(); + std::cerr << "[Device " << device_info.device_id + << "] Buffer overflow, cleared\n"; + } +} + +// 注意:这个函数必须在 buffer_mutex_ 已被锁定的情况下调用 +void ClientContext::process_buffer() { + constexpr int MSG_HEAD_LEN = 6; // 最小包头长度 + + while (true) { + // 检查缓冲区大小是否足够解析包头 + if (recv_buffer_.size() < MSG_HEAD_LEN) + break; + + // 云服务器状态报文检查 (EB 90 EB 90) + if (recv_buffer_.size() >= 4 && + recv_buffer_[0] == 0xEB && recv_buffer_[1] == 0x90 && + recv_buffer_[2] == 0xEB && recv_buffer_[3] == 0x90) { + + const int packageLen = 150; // 固定长度 + if (recv_buffer_.size() < packageLen) + break; + + // 提取完整报文 + std::vector packet( + recv_buffer_.begin(), + recv_buffer_.begin() + packageLen + ); + + // 从缓冲区移除已处理数据 + recv_buffer_.erase( + recv_buffer_.begin(), + recv_buffer_.begin() + packageLen + ); + + // 放入消息队列 + put_packet_into_queue(packet); + continue; + } + + // 标准报文检查 (EB 90) + if (recv_buffer_[0] != 0xEB || recv_buffer_[1] != 0x90) { + // 非法包头,清空缓冲区 + recv_buffer_.clear(); + break; + } + + // 解析包长度 (小端序) + if (recv_buffer_.size() < 6) + break; + + uint16_t body_len = (recv_buffer_[4] << 8) | recv_buffer_[5]; + const int packageLen = body_len + 10; // 基础长度+扩展 + + if (recv_buffer_.size() < packageLen) + break; + + // 提取完整报文 + std::vector packet( + recv_buffer_.begin(), + recv_buffer_.begin() + packageLen + ); + + // 从缓冲区移除已处理数据 + recv_buffer_.erase( + recv_buffer_.begin(), + recv_buffer_.begin() + packageLen + ); + + // 放入消息队列 + put_packet_into_queue(packet); + } +} + +void ClientContext::put_packet_into_queue( + const std::vector& packet) +{ + deal_message_t msg; + msg.device_id = device_info.device_id; + msg.mac = device_info.mac; + msg.points = device_info.points; + + msg.length = packet.size(); + msg.data = static_cast(malloc(msg.length)); + memcpy(msg.data, packet.data(), msg.length); + + if (!message_queue.push(msg)) { + free(msg.data); + std::cerr << "[Device " << device_info.device_id + << "] Queue full, dropping packet\n"; + } +} + /* 缓冲区分配回调 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = new char[suggested_size]; @@ -100,24 +208,9 @@ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { } if (nread > 0) { - // 将接收到的数据放入消息队列 - deal_message_t msg; - msg.device_id = ctx->device_info.device_id; // 直接赋值 - msg.mac = ctx->device_info.mac; // 直接赋值 - - // 复制测点信息 - msg.points = ctx->device_info.points; - - msg.data = static_cast(malloc(nread)); - msg.length = nread; - memcpy(msg.data, buf->base, nread); - - if (!message_queue.push(msg)) { - std::cerr << "[Device " << ctx->device_info.device_id - << "] Message queue full, dropping message" << std::endl; - free(msg.data); - } - std::cout << "on_read: " << ctx->device_info.mac << " get!" << std::endl; + // 使用公共方法添加并处理数据 + ctx->append_and_process_data(buf->base, nread); + std::cout << "on_read: " << ctx->device_info.mac << " get " << nread << " bytes" << std::endl; } delete[] buf->base; diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index 9cc37d9..122a91d 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -51,9 +51,16 @@ public: void start_reconnect_timer(int delay); void stop_timers(); void close_handles(); + void append_and_process_data(const char* data, size_t len); + void put_packet_into_queue(const std::vector& packet); private: int index_; + +private: + std::vector recv_buffer_; // ݻ + std::mutex buffer_mutex_; // + void process_buffer(); }; class ClientManager { diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 5a83df4..955e75f 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -15,12 +15,14 @@ using namespace std; SafeMessageQueue message_queue; // ȫϢ -void process_received_message(string mac, const char* data, size_t length) { +void process_received_message(string mac, string id,const char* data, size_t length) { // ʵʵϢ߼ // ҵ - std::cout << "Active connections: " << mac << " size:" << length << std::endl; + std::cout << "Active connections: " << mac << " id:" << id << " size:" << length << std::endl; // ʾϢ // ע⣺Эʵ־Ľ߼ + + } diff --git a/LFtid1056/dealMsg.h b/LFtid1056/dealMsg.h index 56cbe82..627f728 100644 --- a/LFtid1056/dealMsg.h +++ b/LFtid1056/dealMsg.h @@ -60,4 +60,4 @@ public: } }; -void process_received_message(string mac, const char* data, size_t length); \ No newline at end of file +void process_received_message(string mac, string id, const char* data, size_t length); \ No newline at end of file diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index df7d7f8..c68e6f2 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -136,7 +136,7 @@ void* client_manager_thread(void* arg) { // װб std::vector devices = { { - "D001", "Primary Device", "Model-X", "00-B7-8D-A8-00-D1", + "D001", "Primary Device", "Model-X", "00-B7-8D-A8-00-D9", 1, points1 }, { @@ -181,7 +181,7 @@ void* message_processor_thread(void* arg) { // ɺͷڴ // ʵʵϢ - process_received_message(msg.mac, msg.data, msg.length); + process_received_message(msg.mac, msg.device_id, msg.data, msg.length); free(msg.data); }