diff --git a/LFtid1056/LFtid1056.vcxproj b/LFtid1056/LFtid1056.vcxproj index df45ba4..9735f7e 100644 --- a/LFtid1056/LFtid1056.vcxproj +++ b/LFtid1056/LFtid1056.vcxproj @@ -77,11 +77,13 @@ + + diff --git a/LFtid1056/client2.cpp b/LFtid1056/client2.cpp index d2a1ef6..b8cd6c5 100644 --- a/LFtid1056/client2.cpp +++ b/LFtid1056/client2.cpp @@ -5,10 +5,10 @@ #include #include "PQSMsg.h" #include "client2.h" +#include "dealMsg.h" // 配置参数 -#define INITIAL_DATA_SIZE 128 // 初始数据0.1KB -#define CONNECTIONS 1000 // 支持1000个并发连接 +#define CONNECTIONS 10 // 支持1000个并发连接 #define SERVER_IP "101.132.39.45" // 目标服务器IP "101.132.39.45" #define SERVER_PORT 1056 // 目标服务器端口 #define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms) @@ -17,7 +17,7 @@ static uv_loop_t* global_loop; // 全局事件循环 static client_context_t client_contexts[CONNECTIONS]; // 客户端上下文数组 static uv_timer_t monitor_timer; // 连接监控定时器 - +extern SafeMessageQueue message_queue; /* 缓冲区分配回调 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void* buffer = malloc(suggested_size); @@ -43,8 +43,17 @@ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { } if (nread > 0) { - //fprintf(stdout, "[Client %d] RECV %zd bytes data\n", ctx->index, nread); - // 数据处理逻辑... + // 将接收到的数据放入消息队列 + deal_message_t msg; + msg.client_index = ctx->index; + msg.data = (char*)malloc(nread); + msg.length = nread; + memcpy(msg.data, buf->base, nread); + + if (!message_queue.push(msg)) { + fprintf(stderr, "[Client %d] Message queue full, dropping message\n", ctx->index); + free(msg.data); + } } free(buf->base); } @@ -69,7 +78,7 @@ void on_timer(uv_timer_t* handle) { if (ctx->state != STATE_CONNECTED) { return; } - + //单个装置定时消息收发机制 // 生成完整报文 装置云服务登录报文 auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6"); @@ -107,7 +116,7 @@ void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t d void on_close(uv_handle_t* handle) { client_context_t* ctx = (client_context_t*)handle->data; ctx->state = STATE_DISCONNECTED; - + fprintf(stderr, "[Client %d] closed\n", ctx->index); // 停止定时器 uv_timer_stop(&ctx->timer); uv_timer_stop(&ctx->reconnect_timer); @@ -132,7 +141,7 @@ void try_reconnect(uv_timer_t* timer) { if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) { return; } - + fprintf(stderr, "[Client %d] try_reconnect\n", ctx->index); // 重新初始化TCP句柄 uv_tcp_init(ctx->loop, &ctx->client); ctx->client.data = ctx; @@ -165,7 +174,7 @@ void on_connect(uv_connect_t* req, int status) { free(req); return; } - + fprintf(stderr, "[Client %d] on_connect\n", ctx->index); ctx->state = STATE_CONNECTED; ctx->reconnect_attempts = 0; @@ -225,9 +234,6 @@ void stop_all_clients() { } /* 连接监控回调 */ void monitor_connections(uv_timer_t* handle) { - static int shutdown_flag = 0; // 新增关闭标志 - if (shutdown_flag) return; // 已关闭则不再处理 - // 自动恢复断开的连接 static int recovery_counter = 0; if (++recovery_counter >= 5) { // 每5次监控执行一次恢复 @@ -246,9 +252,6 @@ void monitor_connections(uv_timer_t* handle) { //if (monitor_temp >= 30) { // monitor_temp = 0; // printf("30 second to stop all client\n"); - // // 设置关闭标志 - // shutdown_flag = 1; - // // // 停止并关闭监控定时器 // uv_timer_stop(handle); // uv_close((uv_handle_t*)handle, NULL); @@ -261,6 +264,14 @@ void monitor_connections(uv_timer_t* handle) { //} } +static void close_walk_cb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + fprintf(stderr, "Force closing leaked handle: %p (type=%d)\n", + handle, handle->type); + uv_close(handle, NULL); + } +} + void start_client_connect() { // 创建全局事件循环 global_loop = uv_default_loop(); @@ -275,7 +286,18 @@ void start_client_connect() { // 运行事件循环 uv_run(global_loop, UV_RUN_DEFAULT); - // 清理资源(关键:确保循环完全停止) - uv_loop_close(global_loop); + // 添加资源清理阶段 + while (uv_loop_alive(global_loop)) { + uv_run(global_loop, UV_RUN_ONCE); + } + + // 安全关闭事件循环 + int err = uv_loop_close(global_loop); + if (err) { + fprintf(stderr, "uv_loop_close error: %s\n", uv_strerror(err)); + // 强制清理残留句柄(调试用) + uv_walk(global_loop, close_walk_cb, NULL); + uv_run(global_loop, UV_RUN_NOWAIT); + } global_loop = NULL; } diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index f200018..20d7fb2 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -30,4 +30,4 @@ void on_connect(uv_connect_t* req, int status);// void on_close(uv_handle_t* handle);//ͻ˶Ͽص void stop_all_clients(); // ֹͣпͻ void start_client_connect();//ͻ -void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size); +void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size); \ No newline at end of file diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp new file mode 100644 index 0000000..7b47c24 --- /dev/null +++ b/LFtid1056/dealMsg.cpp @@ -0,0 +1,25 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "PQSMsg.h" +#include "client2.h" +#include "dealMsg.h" + +SafeMessageQueue message_queue; // ȫϢ + +void process_received_message(int client_index, const char* data, size_t length) { + // ʵʵϢ߼ + // ҵ + printf("Processing message from client %d, size: %zu\n", client_index, length); + + // ʾϢ + // ע⣺Эʵ־Ľ߼ +} + + diff --git a/LFtid1056/dealMsg.h b/LFtid1056/dealMsg.h new file mode 100644 index 0000000..8550547 --- /dev/null +++ b/LFtid1056/dealMsg.h @@ -0,0 +1,60 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* */ +#define MESSAGE_QUEUE_SIZE 10000 // Ϣ + +/* Ϣṹ */ +typedef struct { + int client_index; // ͻ + char* data; // Ϣ + size_t length; // Ϣ +} deal_message_t; + +/* ̰߳ȫϢ */ +class SafeMessageQueue { +private: + std::queue queue; + pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + std::atomic count{ 0 }; + +public: + bool push(const deal_message_t& msg) { + pthread_mutex_lock(&mutex); + if (queue.size() >= MESSAGE_QUEUE_SIZE) { + pthread_mutex_unlock(&mutex); + return false; + } + queue.push(msg); + count++; + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + return true; + } + + bool pop(deal_message_t& msg) { + pthread_mutex_lock(&mutex); + while (queue.empty()) { + pthread_cond_wait(&cond, &mutex); + } + msg = queue.front(); + queue.pop(); + count--; + pthread_mutex_unlock(&mutex); + return true; + } + + size_t size() const { + return count.load(); + } +}; + +void process_received_message(int client_index, const char* data, size_t length); \ No newline at end of file diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 37dd3af..af393b0 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -6,6 +6,7 @@ #include #include "PQSMsg.h" #include "client2.h" +#include "dealMsg.h" /* */ #define THREAD_CONNECTIONS 10 // ߳ @@ -30,6 +31,7 @@ typedef struct { /* ȫֱ */ thread_info_t thread_info[THREAD_CONNECTIONS]; // ߳Ϣ pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // ȫֻ +extern SafeMessageQueue message_queue; /* ̹߳ ̳߳*/ void* work_thread(void* arg) { @@ -62,7 +64,7 @@ void* work_thread(void* arg) { pthread_mutex_unlock(&thread_info[index].lock); return NULL; } -/* ̹߳ 1߳*/ +/* ̹߳ 0߳*/ /* ͻӹ̺߳*/ void* client_manager_thread(void* arg) { int index = *(int*)arg; @@ -87,6 +89,42 @@ void* client_manager_thread(void* arg) { pthread_mutex_unlock(&thread_info[index].lock); return NULL; } +/* ̹߳ 1߳*/ +/* Ϣ̺߳ */ +void* message_processor_thread(void* arg) { + int index = *(int*)arg; + free(arg); + + // ߳״̬Ϊ + pthread_mutex_lock(&thread_info[index].lock); + printf("Message Processor Thread %d started\n", index); + thread_info[index].state = THREAD_RUNNING; + pthread_mutex_unlock(&thread_info[index].lock); + + // Ϣѭ + while (1) { + deal_message_t msg; + if (message_queue.pop(msg)) { + // ʵϢ߼ + // ע⣺Ҫmsg.client_indexֿͻ + // ɺͷڴ + printf("Processing message from client %d, length: %zu\n", + msg.client_index, msg.length); + + // ʵʵϢ + process_received_message(msg.client_index, msg.data, msg.length); + + free(msg.data); + } + } + + // ֹ߳ + pthread_mutex_lock(&thread_info[index].lock); + thread_info[index].state = THREAD_STOPPED; + printf("Message Processor Thread %d stopped\n", index); + pthread_mutex_unlock(&thread_info[index].lock); + return NULL; +} /* ߳ */ void restart_thread(int index) { pthread_mutex_lock(&global_lock); @@ -113,6 +151,16 @@ void restart_thread(int index) { free(new_index); } } + else if (index == 1) { + // Ϣ߳ + if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) { + pthread_mutex_lock(&global_lock); + printf("Failed to restart message processor thread %d\n", index); + thread_info[index].state = THREAD_CRASHED; + pthread_mutex_unlock(&global_lock); + free(new_index); + } + } else { // ߳ // ΪգʵӦп߳ @@ -147,11 +195,19 @@ int main() { free(index); } } + else if (i == 1) { + // Ϣ߳ + if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) { + printf("Failed to create message processor thread %d\n", i); + free(index); + } + } else { // ߳ // ΪգʵӦп߳ free(index); } + } printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS); @@ -179,6 +235,13 @@ int main() { pthread_mutex_unlock(&thread_info[i].lock); } } + + // socket״̬ + static int queue_monitor = 0; + if (++queue_monitor >= 10) { // ÿ10뱨һ + printf("Message queue size: %zu\n", message_queue.size()); + queue_monitor = 0; + } } // Դ(ϲִе)