From b487937ad647b536b030e50b6cd847943bb1b9de Mon Sep 17 00:00:00 2001
From: zw <3466561528@qq.com>
Date: Fri, 20 Jun 2025 09:25:17 +0800
Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E6=B6=88=E6=81=AF?=
=?UTF-8?q?=E5=A4=84=E7=90=86=E7=BA=BF=E7=A8=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
LFtid1056/LFtid1056.vcxproj | 2 ++
LFtid1056/client2.cpp | 56 ++++++++++++++++++++++----------
LFtid1056/client2.h | 2 +-
LFtid1056/dealMsg.cpp | 25 ++++++++++++++
LFtid1056/dealMsg.h | 60 ++++++++++++++++++++++++++++++++++
LFtid1056/main_thread.cpp | 65 ++++++++++++++++++++++++++++++++++++-
6 files changed, 191 insertions(+), 19 deletions(-)
create mode 100644 LFtid1056/dealMsg.cpp
create mode 100644 LFtid1056/dealMsg.h
diff --git a/LFtid1056/LFtid1056.vcxproj b/LFtid1056/LFtid1056.vcxproj
index df45ba4..9735f7e 100644
--- a/LFtid1056/LFtid1056.vcxproj
+++ b/LFtid1056/LFtid1056.vcxproj
@@ -77,11 +77,13 @@
+
+
diff --git a/LFtid1056/client2.cpp b/LFtid1056/client2.cpp
index d2a1ef6..b8cd6c5 100644
--- a/LFtid1056/client2.cpp
+++ b/LFtid1056/client2.cpp
@@ -5,10 +5,10 @@
#include
#include "PQSMsg.h"
#include "client2.h"
+#include "dealMsg.h"
// 配置参数
-#define INITIAL_DATA_SIZE 128 // 初始数据0.1KB
-#define CONNECTIONS 1000 // 支持1000个并发连接
+#define CONNECTIONS 10 // 支持1000个并发连接
#define SERVER_IP "101.132.39.45" // 目标服务器IP "101.132.39.45"
#define SERVER_PORT 1056 // 目标服务器端口
#define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms)
@@ -17,7 +17,7 @@
static uv_loop_t* global_loop; // 全局事件循环
static client_context_t client_contexts[CONNECTIONS]; // 客户端上下文数组
static uv_timer_t monitor_timer; // 连接监控定时器
-
+extern SafeMessageQueue message_queue;
/* 缓冲区分配回调 */
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
void* buffer = malloc(suggested_size);
@@ -43,8 +43,17 @@ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
}
if (nread > 0) {
- //fprintf(stdout, "[Client %d] RECV %zd bytes data\n", ctx->index, nread);
- // 数据处理逻辑...
+ // 将接收到的数据放入消息队列
+ deal_message_t msg;
+ msg.client_index = ctx->index;
+ msg.data = (char*)malloc(nread);
+ msg.length = nread;
+ memcpy(msg.data, buf->base, nread);
+
+ if (!message_queue.push(msg)) {
+ fprintf(stderr, "[Client %d] Message queue full, dropping message\n", ctx->index);
+ free(msg.data);
+ }
}
free(buf->base);
}
@@ -69,7 +78,7 @@ void on_timer(uv_timer_t* handle) {
if (ctx->state != STATE_CONNECTED) {
return;
}
-
+ //单个装置定时消息收发机制
// 生成完整报文 装置云服务登录报文
auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6");
@@ -107,7 +116,7 @@ void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t d
void on_close(uv_handle_t* handle) {
client_context_t* ctx = (client_context_t*)handle->data;
ctx->state = STATE_DISCONNECTED;
-
+ fprintf(stderr, "[Client %d] closed\n", ctx->index);
// 停止定时器
uv_timer_stop(&ctx->timer);
uv_timer_stop(&ctx->reconnect_timer);
@@ -132,7 +141,7 @@ void try_reconnect(uv_timer_t* timer) {
if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) {
return;
}
-
+ fprintf(stderr, "[Client %d] try_reconnect\n", ctx->index);
// 重新初始化TCP句柄
uv_tcp_init(ctx->loop, &ctx->client);
ctx->client.data = ctx;
@@ -165,7 +174,7 @@ void on_connect(uv_connect_t* req, int status) {
free(req);
return;
}
-
+ fprintf(stderr, "[Client %d] on_connect\n", ctx->index);
ctx->state = STATE_CONNECTED;
ctx->reconnect_attempts = 0;
@@ -225,9 +234,6 @@ void stop_all_clients() {
}
/* 连接监控回调 */
void monitor_connections(uv_timer_t* handle) {
- static int shutdown_flag = 0; // 新增关闭标志
- if (shutdown_flag) return; // 已关闭则不再处理
-
// 自动恢复断开的连接
static int recovery_counter = 0;
if (++recovery_counter >= 5) { // 每5次监控执行一次恢复
@@ -246,9 +252,6 @@ void monitor_connections(uv_timer_t* handle) {
//if (monitor_temp >= 30) {
// monitor_temp = 0;
// printf("30 second to stop all client\n");
- // // 设置关闭标志
- // shutdown_flag = 1;
- //
// // 停止并关闭监控定时器
// uv_timer_stop(handle);
// uv_close((uv_handle_t*)handle, NULL);
@@ -261,6 +264,14 @@ void monitor_connections(uv_timer_t* handle) {
//}
}
+static void close_walk_cb(uv_handle_t* handle, void* arg) {
+ if (!uv_is_closing(handle)) {
+ fprintf(stderr, "Force closing leaked handle: %p (type=%d)\n",
+ handle, handle->type);
+ uv_close(handle, NULL);
+ }
+}
+
void start_client_connect() {
// 创建全局事件循环
global_loop = uv_default_loop();
@@ -275,7 +286,18 @@ void start_client_connect() {
// 运行事件循环
uv_run(global_loop, UV_RUN_DEFAULT);
- // 清理资源(关键:确保循环完全停止)
- uv_loop_close(global_loop);
+ // 添加资源清理阶段
+ while (uv_loop_alive(global_loop)) {
+ uv_run(global_loop, UV_RUN_ONCE);
+ }
+
+ // 安全关闭事件循环
+ int err = uv_loop_close(global_loop);
+ if (err) {
+ fprintf(stderr, "uv_loop_close error: %s\n", uv_strerror(err));
+ // 强制清理残留句柄(调试用)
+ uv_walk(global_loop, close_walk_cb, NULL);
+ uv_run(global_loop, UV_RUN_NOWAIT);
+ }
global_loop = NULL;
}
diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h
index f200018..20d7fb2 100644
--- a/LFtid1056/client2.h
+++ b/LFtid1056/client2.h
@@ -30,4 +30,4 @@ void on_connect(uv_connect_t* req, int status);//
void on_close(uv_handle_t* handle);//ͻ˶Ͽص
void stop_all_clients(); // ֹͣпͻ
void start_client_connect();//ͻ
-void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size);
+void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size);
\ No newline at end of file
diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp
new file mode 100644
index 0000000..7b47c24
--- /dev/null
+++ b/LFtid1056/dealMsg.cpp
@@ -0,0 +1,25 @@
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include "PQSMsg.h"
+#include "client2.h"
+#include "dealMsg.h"
+
+SafeMessageQueue message_queue; // ȫϢ
+
+void process_received_message(int client_index, const char* data, size_t length) {
+ // ʵʵϢ
+ // ҵ
+ printf("Processing message from client %d, size: %zu\n", client_index, length);
+
+ // ʾϢ
+ // ע⣺Эʵ־Ľ
+}
+
+
diff --git a/LFtid1056/dealMsg.h b/LFtid1056/dealMsg.h
new file mode 100644
index 0000000..8550547
--- /dev/null
+++ b/LFtid1056/dealMsg.h
@@ -0,0 +1,60 @@
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+/* */
+#define MESSAGE_QUEUE_SIZE 10000 // Ϣ
+
+/* Ϣṹ */
+typedef struct {
+ int client_index; // ͻ
+ char* data; // Ϣ
+ size_t length; // Ϣ
+} deal_message_t;
+
+/* ̰߳ȫϢ */
+class SafeMessageQueue {
+private:
+ std::queue queue;
+ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+ pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+ std::atomic count{ 0 };
+
+public:
+ bool push(const deal_message_t& msg) {
+ pthread_mutex_lock(&mutex);
+ if (queue.size() >= MESSAGE_QUEUE_SIZE) {
+ pthread_mutex_unlock(&mutex);
+ return false;
+ }
+ queue.push(msg);
+ count++;
+ pthread_cond_signal(&cond);
+ pthread_mutex_unlock(&mutex);
+ return true;
+ }
+
+ bool pop(deal_message_t& msg) {
+ pthread_mutex_lock(&mutex);
+ while (queue.empty()) {
+ pthread_cond_wait(&cond, &mutex);
+ }
+ msg = queue.front();
+ queue.pop();
+ count--;
+ pthread_mutex_unlock(&mutex);
+ return true;
+ }
+
+ size_t size() const {
+ return count.load();
+ }
+};
+
+void process_received_message(int client_index, const char* data, size_t length);
\ No newline at end of file
diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp
index 37dd3af..af393b0 100644
--- a/LFtid1056/main_thread.cpp
+++ b/LFtid1056/main_thread.cpp
@@ -6,6 +6,7 @@
#include
#include "PQSMsg.h"
#include "client2.h"
+#include "dealMsg.h"
/* */
#define THREAD_CONNECTIONS 10 // ߳
@@ -30,6 +31,7 @@ typedef struct {
/* ȫֱ */
thread_info_t thread_info[THREAD_CONNECTIONS]; // ߳Ϣ
pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // ȫֻ
+extern SafeMessageQueue message_queue;
/* ̹߳ ̳߳*/
void* work_thread(void* arg) {
@@ -62,7 +64,7 @@ void* work_thread(void* arg) {
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
-/* ̹߳ 1߳*/
+/* ̹߳ 0߳*/
/* ͻӹ̺߳*/
void* client_manager_thread(void* arg) {
int index = *(int*)arg;
@@ -87,6 +89,42 @@ void* client_manager_thread(void* arg) {
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
+/* ̹߳ 1߳*/
+/* Ϣ̺߳ */
+void* message_processor_thread(void* arg) {
+ int index = *(int*)arg;
+ free(arg);
+
+ // ߳״̬Ϊ
+ pthread_mutex_lock(&thread_info[index].lock);
+ printf("Message Processor Thread %d started\n", index);
+ thread_info[index].state = THREAD_RUNNING;
+ pthread_mutex_unlock(&thread_info[index].lock);
+
+ // Ϣѭ
+ while (1) {
+ deal_message_t msg;
+ if (message_queue.pop(msg)) {
+ // ʵϢ
+ // ע⣺Ҫmsg.client_indexֿͻ
+ // ɺͷڴ
+ printf("Processing message from client %d, length: %zu\n",
+ msg.client_index, msg.length);
+
+ // ʵʵϢ
+ process_received_message(msg.client_index, msg.data, msg.length);
+
+ free(msg.data);
+ }
+ }
+
+ // ֹ߳
+ pthread_mutex_lock(&thread_info[index].lock);
+ thread_info[index].state = THREAD_STOPPED;
+ printf("Message Processor Thread %d stopped\n", index);
+ pthread_mutex_unlock(&thread_info[index].lock);
+ return NULL;
+}
/* ߳ */
void restart_thread(int index) {
pthread_mutex_lock(&global_lock);
@@ -113,6 +151,16 @@ void restart_thread(int index) {
free(new_index);
}
}
+ else if (index == 1) {
+ // Ϣ߳
+ if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) {
+ pthread_mutex_lock(&global_lock);
+ printf("Failed to restart message processor thread %d\n", index);
+ thread_info[index].state = THREAD_CRASHED;
+ pthread_mutex_unlock(&global_lock);
+ free(new_index);
+ }
+ }
else {
// ߳
// ΪգʵӦп߳
@@ -147,11 +195,19 @@ int main() {
free(index);
}
}
+ else if (i == 1) {
+ // Ϣ߳
+ if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) {
+ printf("Failed to create message processor thread %d\n", i);
+ free(index);
+ }
+ }
else {
// ߳
// ΪգʵӦп߳
free(index);
}
+
}
printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS);
@@ -179,6 +235,13 @@ int main() {
pthread_mutex_unlock(&thread_info[i].lock);
}
}
+
+ // socket״̬
+ static int queue_monitor = 0;
+ if (++queue_monitor >= 10) { // ÿ10뱨һ
+ printf("Message queue size: %zu\n", message_queue.size());
+ queue_monitor = 0;
+ }
}
// Դ(ϲִе)