添加了socket数据处理,处理粘包和分包的问题

This commit is contained in:
zw
2025-06-25 10:54:09 +08:00
parent ccd7a3bb59
commit 3b4a4704db
5 changed files with 127 additions and 25 deletions

View File

@@ -13,8 +13,8 @@
// 配置参数
constexpr int BASE_RECONNECT_DELAY = 5000; // 基础重连延迟(ms)
constexpr int MAX_RECONNECT_DELAY = 60000; // 最大重连延迟(ms)
constexpr const char* SERVER_IP = "172.25.144.1"; // 目标服务器IP"101.132.39.45"
constexpr int SERVER_PORT = 61000; // 目标服务器端口1056
constexpr const char* SERVER_IP = "101.132.39.45"; // 目标服务器IP"101.132.39.45"
constexpr int SERVER_PORT = 1056; // 目标服务器端口1056
static uv_loop_t* global_loop = nullptr;
static uv_timer_t monitor_timer;
@@ -25,6 +25,8 @@ ClientContext::ClientContext(uv_loop_t* loop, const DeviceInfo& device, int inde
: loop(loop), state(ConnectionState::DISCONNECTED),
reconnect_attempts(0), shutdown(false), device_info(device), index_(index) {
recv_buffer_.reserve(4096); // 预分配4KB缓冲区
// 初始化 TCP 句柄
uv_tcp_init(loop, &client);
client.data = this;
@@ -79,6 +81,112 @@ void ClientContext::close_handles() {
}
}
// 添加接收数据到缓冲区并处理
void ClientContext::append_and_process_data(const char* data, size_t len) {
std::lock_guard<std::mutex> lock(buffer_mutex_);
// 添加到缓冲区
recv_buffer_.insert(recv_buffer_.end(), data, data + len);
// 处理缓冲区数据
process_buffer();
// 检查缓冲区大小防止内存溢出
if (recv_buffer_.size() > 10 * 1024 * 1024) { // 10MB限制
recv_buffer_.clear();
std::cerr << "[Device " << device_info.device_id
<< "] Buffer overflow, cleared\n";
}
}
// 注意:这个函数必须在 buffer_mutex_ 已被锁定的情况下调用
void ClientContext::process_buffer() {
constexpr int MSG_HEAD_LEN = 6; // 最小包头长度
while (true) {
// 检查缓冲区大小是否足够解析包头
if (recv_buffer_.size() < MSG_HEAD_LEN)
break;
// 云服务器状态报文检查 (EB 90 EB 90)
if (recv_buffer_.size() >= 4 &&
recv_buffer_[0] == 0xEB && recv_buffer_[1] == 0x90 &&
recv_buffer_[2] == 0xEB && recv_buffer_[3] == 0x90) {
const int packageLen = 150; // 固定长度
if (recv_buffer_.size() < packageLen)
break;
// 提取完整报文
std::vector<unsigned char> packet(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 从缓冲区移除已处理数据
recv_buffer_.erase(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 放入消息队列
put_packet_into_queue(packet);
continue;
}
// 标准报文检查 (EB 90)
if (recv_buffer_[0] != 0xEB || recv_buffer_[1] != 0x90) {
// 非法包头,清空缓冲区
recv_buffer_.clear();
break;
}
// 解析包长度 (小端序)
if (recv_buffer_.size() < 6)
break;
uint16_t body_len = (recv_buffer_[4] << 8) | recv_buffer_[5];
const int packageLen = body_len + 10; // 基础长度+扩展
if (recv_buffer_.size() < packageLen)
break;
// 提取完整报文
std::vector<unsigned char> packet(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 从缓冲区移除已处理数据
recv_buffer_.erase(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 放入消息队列
put_packet_into_queue(packet);
}
}
void ClientContext::put_packet_into_queue(
const std::vector<unsigned char>& packet)
{
deal_message_t msg;
msg.device_id = device_info.device_id;
msg.mac = device_info.mac;
msg.points = device_info.points;
msg.length = packet.size();
msg.data = static_cast<char*>(malloc(msg.length));
memcpy(msg.data, packet.data(), msg.length);
if (!message_queue.push(msg)) {
free(msg.data);
std::cerr << "[Device " << device_info.device_id
<< "] Queue full, dropping packet\n";
}
}
/* 缓冲区分配回调 */
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = new char[suggested_size];
@@ -100,24 +208,9 @@ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
}
if (nread > 0) {
// 将接收到的数据放入消息队列
deal_message_t msg;
msg.device_id = ctx->device_info.device_id; // 直接赋值
msg.mac = ctx->device_info.mac; // 直接赋值
// 复制测点信息
msg.points = ctx->device_info.points;
msg.data = static_cast<char*>(malloc(nread));
msg.length = nread;
memcpy(msg.data, buf->base, nread);
if (!message_queue.push(msg)) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] Message queue full, dropping message" << std::endl;
free(msg.data);
}
std::cout << "on_read: " << ctx->device_info.mac << " get!" << std::endl;
// 使用公共方法添加并处理数据
ctx->append_and_process_data(buf->base, nread);
std::cout << "on_read: " << ctx->device_info.mac << " get " << nread << " bytes" << std::endl;
}
delete[] buf->base;

View File

@@ -51,9 +51,16 @@ public:
void start_reconnect_timer(int delay);
void stop_timers();
void close_handles();
void append_and_process_data(const char* data, size_t len);
void put_packet_into_queue(const std::vector<unsigned char>& packet);
private:
int index_;
private:
std::vector<unsigned char> recv_buffer_; // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݻ<EFBFBD><DDBB><EFBFBD><EFBFBD><EFBFBD>
std::mutex buffer_mutex_; // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
void process_buffer();
};
class ClientManager {

View File

@@ -15,12 +15,14 @@ using namespace std;
SafeMessageQueue message_queue; // ȫ<><C8AB><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>
void process_received_message(string mac, const char* data, size_t length) {
void process_received_message(string mac, string id,const char* data, size_t length) {
// ʵ<>ʵ<EFBFBD><CAB5><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߼<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ҵ<EFBFBD><D2B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
std::cout << "Active connections: " << mac << " size:" << length << std::endl;
std::cout << "Active connections: " << mac << " id:" << id << " size:" << length << std::endl;
// ʾ<><CABE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
// ע<><EFBFBD><E2A3BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Э<EFBFBD><D0AD>ʵ<EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD>Ľ<EFBFBD><C4BD><EFBFBD><EFBFBD>߼<EFBFBD>
}

View File

@@ -60,4 +60,4 @@ public:
}
};
void process_received_message(string mac, const char* data, size_t length);
void process_received_message(string mac, string id, const char* data, size_t length);

View File

@@ -136,7 +136,7 @@ void* client_manager_thread(void* arg) {
// <20><><EFBFBD><EFBFBD>װ<EFBFBD><D7B0><EFBFBD>б<EFBFBD>
std::vector<DeviceInfo> devices = {
{
"D001", "Primary Device", "Model-X", "00-B7-8D-A8-00-D1",
"D001", "Primary Device", "Model-X", "00-B7-8D-A8-00-D9",
1, points1
},
{
@@ -181,7 +181,7 @@ void* message_processor_thread(void* arg) {
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɺ<EFBFBD><C9BA>ͷ<EFBFBD><CDB7>ڴ<EFBFBD>
// <20><><EFBFBD><EFBFBD>ʵ<EFBFBD>ʵ<EFBFBD><CAB5><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
process_received_message(msg.mac, msg.data, msg.length);
process_received_message(msg.mac, msg.device_id, msg.data, msg.length);
free(msg.data);
}