添加了消息处理线程

This commit is contained in:
zw
2025-06-20 09:25:17 +08:00
parent c51da2e506
commit b487937ad6
6 changed files with 191 additions and 19 deletions

View File

@@ -77,11 +77,13 @@
<PropertyGroup Label="UserMacros" /> <PropertyGroup Label="UserMacros" />
<ItemGroup> <ItemGroup>
<ClCompile Include="client2.cpp" /> <ClCompile Include="client2.cpp" />
<ClCompile Include="dealMsg.cpp" />
<ClCompile Include="main_thread.cpp" /> <ClCompile Include="main_thread.cpp" />
<ClCompile Include="PQSMsg.cpp" /> <ClCompile Include="PQSMsg.cpp" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="client2.h" /> <ClInclude Include="client2.h" />
<ClInclude Include="dealMsg.h" />
<ClInclude Include="PQSMsg.h" /> <ClInclude Include="PQSMsg.h" />
</ItemGroup> </ItemGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">

View File

@@ -5,10 +5,10 @@
#include <math.h> #include <math.h>
#include "PQSMsg.h" #include "PQSMsg.h"
#include "client2.h" #include "client2.h"
#include "dealMsg.h"
// 配置参数 // 配置参数
#define INITIAL_DATA_SIZE 128 // 初始数据0.1KB #define CONNECTIONS 10 // 支持1000个并发连接
#define CONNECTIONS 1000 // 支持1000个并发连接
#define SERVER_IP "101.132.39.45" // 目标服务器IP "101.132.39.45" #define SERVER_IP "101.132.39.45" // 目标服务器IP "101.132.39.45"
#define SERVER_PORT 1056 // 目标服务器端口 #define SERVER_PORT 1056 // 目标服务器端口
#define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms) #define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms)
@@ -17,7 +17,7 @@
static uv_loop_t* global_loop; // 全局事件循环 static uv_loop_t* global_loop; // 全局事件循环
static client_context_t client_contexts[CONNECTIONS]; // 客户端上下文数组 static client_context_t client_contexts[CONNECTIONS]; // 客户端上下文数组
static uv_timer_t monitor_timer; // 连接监控定时器 static uv_timer_t monitor_timer; // 连接监控定时器
extern SafeMessageQueue message_queue;
/* 缓冲区分配回调 */ /* 缓冲区分配回调 */
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
void* buffer = malloc(suggested_size); void* buffer = malloc(suggested_size);
@@ -43,8 +43,17 @@ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
} }
if (nread > 0) { if (nread > 0) {
//fprintf(stdout, "[Client %d] RECV %zd bytes data\n", ctx->index, nread); // 将接收到的数据放入消息队列
// 数据处理逻辑... deal_message_t msg;
msg.client_index = ctx->index;
msg.data = (char*)malloc(nread);
msg.length = nread;
memcpy(msg.data, buf->base, nread);
if (!message_queue.push(msg)) {
fprintf(stderr, "[Client %d] Message queue full, dropping message\n", ctx->index);
free(msg.data);
}
} }
free(buf->base); free(buf->base);
} }
@@ -69,7 +78,7 @@ void on_timer(uv_timer_t* handle) {
if (ctx->state != STATE_CONNECTED) { if (ctx->state != STATE_CONNECTED) {
return; return;
} }
//单个装置定时消息收发机制
// 生成完整报文 装置云服务登录报文 // 生成完整报文 装置云服务登录报文
auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6"); auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6");
@@ -107,7 +116,7 @@ void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t d
void on_close(uv_handle_t* handle) { void on_close(uv_handle_t* handle) {
client_context_t* ctx = (client_context_t*)handle->data; client_context_t* ctx = (client_context_t*)handle->data;
ctx->state = STATE_DISCONNECTED; ctx->state = STATE_DISCONNECTED;
fprintf(stderr, "[Client %d] closed\n", ctx->index);
// 停止定时器 // 停止定时器
uv_timer_stop(&ctx->timer); uv_timer_stop(&ctx->timer);
uv_timer_stop(&ctx->reconnect_timer); uv_timer_stop(&ctx->reconnect_timer);
@@ -132,7 +141,7 @@ void try_reconnect(uv_timer_t* timer) {
if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) { if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) {
return; return;
} }
fprintf(stderr, "[Client %d] try_reconnect\n", ctx->index);
// 重新初始化TCP句柄 // 重新初始化TCP句柄
uv_tcp_init(ctx->loop, &ctx->client); uv_tcp_init(ctx->loop, &ctx->client);
ctx->client.data = ctx; ctx->client.data = ctx;
@@ -165,7 +174,7 @@ void on_connect(uv_connect_t* req, int status) {
free(req); free(req);
return; return;
} }
fprintf(stderr, "[Client %d] on_connect\n", ctx->index);
ctx->state = STATE_CONNECTED; ctx->state = STATE_CONNECTED;
ctx->reconnect_attempts = 0; ctx->reconnect_attempts = 0;
@@ -225,9 +234,6 @@ void stop_all_clients() {
} }
/* 连接监控回调 */ /* 连接监控回调 */
void monitor_connections(uv_timer_t* handle) { void monitor_connections(uv_timer_t* handle) {
static int shutdown_flag = 0; // 新增关闭标志
if (shutdown_flag) return; // 已关闭则不再处理
// 自动恢复断开的连接 // 自动恢复断开的连接
static int recovery_counter = 0; static int recovery_counter = 0;
if (++recovery_counter >= 5) { // 每5次监控执行一次恢复 if (++recovery_counter >= 5) { // 每5次监控执行一次恢复
@@ -246,9 +252,6 @@ void monitor_connections(uv_timer_t* handle) {
//if (monitor_temp >= 30) { //if (monitor_temp >= 30) {
// monitor_temp = 0; // monitor_temp = 0;
// printf("30 second to stop all client\n"); // printf("30 second to stop all client\n");
// // 设置关闭标志
// shutdown_flag = 1;
//
// // 停止并关闭监控定时器 // // 停止并关闭监控定时器
// uv_timer_stop(handle); // uv_timer_stop(handle);
// uv_close((uv_handle_t*)handle, NULL); // uv_close((uv_handle_t*)handle, NULL);
@@ -261,6 +264,14 @@ void monitor_connections(uv_timer_t* handle) {
//} //}
} }
static void close_walk_cb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
fprintf(stderr, "Force closing leaked handle: %p (type=%d)\n",
handle, handle->type);
uv_close(handle, NULL);
}
}
void start_client_connect() { void start_client_connect() {
// 创建全局事件循环 // 创建全局事件循环
global_loop = uv_default_loop(); global_loop = uv_default_loop();
@@ -275,7 +286,18 @@ void start_client_connect() {
// 运行事件循环 // 运行事件循环
uv_run(global_loop, UV_RUN_DEFAULT); uv_run(global_loop, UV_RUN_DEFAULT);
// 清理资源(关键:确保循环完全停止) // 添加资源清理阶段
uv_loop_close(global_loop); while (uv_loop_alive(global_loop)) {
uv_run(global_loop, UV_RUN_ONCE);
}
// 安全关闭事件循环
int err = uv_loop_close(global_loop);
if (err) {
fprintf(stderr, "uv_loop_close error: %s\n", uv_strerror(err));
// 强制清理残留句柄(调试用)
uv_walk(global_loop, close_walk_cb, NULL);
uv_run(global_loop, UV_RUN_NOWAIT);
}
global_loop = NULL; global_loop = NULL;
} }

25
LFtid1056/dealMsg.cpp Normal file
View File

@@ -0,0 +1,25 @@
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <queue>
#include <vector>
#include <atomic>
#include "PQSMsg.h"
#include "client2.h"
#include "dealMsg.h"
SafeMessageQueue message_queue; // ȫ<><C8AB><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>
void process_received_message(int client_index, const char* data, size_t length) {
// ʵ<>ʵ<EFBFBD><CAB5><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߼<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ҵ<EFBFBD><D2B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
printf("Processing message from client %d, size: %zu\n", client_index, length);
// ʾ<><CABE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
// ע<><EFBFBD><E2A3BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Э<EFBFBD><D0AD>ʵ<EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD>Ľ<EFBFBD><C4BD><EFBFBD><EFBFBD>߼<EFBFBD>
}

60
LFtid1056/dealMsg.h Normal file
View File

@@ -0,0 +1,60 @@
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <queue>
#include <vector>
#include <atomic>
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
#define MESSAGE_QUEUE_SIZE 10000 // <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
/* <20><>Ϣ<EFBFBD><EFBFBD><E1B9B9> */
typedef struct {
int client_index; // <20>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
char* data; // <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>
size_t length; // <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>
} deal_message_t;
/* <20>̰߳<DFB3>ȫ<EFBFBD><C8AB><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD> */
class SafeMessageQueue {
private:
std::queue<deal_message_t> queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
std::atomic<int> count{ 0 };
public:
bool push(const deal_message_t& msg) {
pthread_mutex_lock(&mutex);
if (queue.size() >= MESSAGE_QUEUE_SIZE) {
pthread_mutex_unlock(&mutex);
return false;
}
queue.push(msg);
count++;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return true;
}
bool pop(deal_message_t& msg) {
pthread_mutex_lock(&mutex);
while (queue.empty()) {
pthread_cond_wait(&cond, &mutex);
}
msg = queue.front();
queue.pop();
count--;
pthread_mutex_unlock(&mutex);
return true;
}
size_t size() const {
return count.load();
}
};
void process_received_message(int client_index, const char* data, size_t length);

View File

@@ -6,6 +6,7 @@
#include <time.h> #include <time.h>
#include "PQSMsg.h" #include "PQSMsg.h"
#include "client2.h" #include "client2.h"
#include "dealMsg.h"
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */ /* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
#define THREAD_CONNECTIONS 10 // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD><DFB3><EFBFBD> #define THREAD_CONNECTIONS 10 // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD><DFB3><EFBFBD>
@@ -30,6 +31,7 @@ typedef struct {
/* ȫ<>ֱ<EFBFBD><D6B1><EFBFBD> */ /* ȫ<>ֱ<EFBFBD><D6B1><EFBFBD> */
thread_info_t thread_info[THREAD_CONNECTIONS]; // <20>߳<EFBFBD><DFB3><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD> thread_info_t thread_info[THREAD_CONNECTIONS]; // <20>߳<EFBFBD><DFB3><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>
pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // ȫ<>ֻ<EFBFBD><D6BB><EFBFBD><EFBFBD><EFBFBD> pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // ȫ<>ֻ<EFBFBD><D6BB><EFBFBD><EFBFBD><EFBFBD>
extern SafeMessageQueue message_queue;
/* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̳߳<DFB3>*/ /* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̳߳<DFB3>*/
void* work_thread(void* arg) { void* work_thread(void* arg) {
@@ -62,7 +64,7 @@ void* work_thread(void* arg) {
pthread_mutex_unlock(&thread_info[index].lock); pthread_mutex_unlock(&thread_info[index].lock);
return NULL; return NULL;
} }
/* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 1<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>*/ /* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 0<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>*/
/* <20>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӹ<EFBFBD><D3B9><EFBFBD><EFBFBD>̺߳<DFB3><CCBA><EFBFBD>*/ /* <20>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӹ<EFBFBD><D3B9><EFBFBD><EFBFBD>̺߳<DFB3><CCBA><EFBFBD>*/
void* client_manager_thread(void* arg) { void* client_manager_thread(void* arg) {
int index = *(int*)arg; int index = *(int*)arg;
@@ -87,6 +89,42 @@ void* client_manager_thread(void* arg) {
pthread_mutex_unlock(&thread_info[index].lock); pthread_mutex_unlock(&thread_info[index].lock);
return NULL; return NULL;
} }
/* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 1<><31><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>*/
/* <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>̺߳<DFB3><CCBA><EFBFBD> */
void* message_processor_thread(void* arg) {
int index = *(int*)arg;
free(arg);
// <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>״̬Ϊ<CCAC><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
printf("Message Processor Thread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
// <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>ѭ<EFBFBD><D1AD>
while (1) {
deal_message_t msg;
if (message_queue.pop(msg)) {
// ʵ<><CAB5><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߼<EFBFBD>
// ע<><EFBFBD><E2A3BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD>msg.client_index<65><78><EFBFBD>ֿͻ<D6BF><CDBB><EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɺ<EFBFBD><C9BA>ͷ<EFBFBD><CDB7>ڴ<EFBFBD>
printf("Processing message from client %d, length: %zu\n",
msg.client_index, msg.length);
// <20><><EFBFBD><EFBFBD>ʵ<EFBFBD>ʵ<EFBFBD><CAB5><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
process_received_message(msg.client_index, msg.data, msg.length);
free(msg.data);
}
}
// <20>߳<EFBFBD><DFB3><EFBFBD>ֹ<EFBFBD><D6B9><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Message Processor Thread %d stopped\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
/* <20>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */ /* <20>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
void restart_thread(int index) { void restart_thread(int index) {
pthread_mutex_lock(&global_lock); pthread_mutex_lock(&global_lock);
@@ -113,6 +151,16 @@ void restart_thread(int index) {
free(new_index); free(new_index);
} }
} }
else if (index == 1) {
// <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
}
else { else {
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD> // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>գ<EFBFBD>ʵ<EFBFBD><CAB5>Ӧ<EFBFBD><D3A6><EFBFBD>п<EFBFBD><D0BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD> // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>գ<EFBFBD>ʵ<EFBFBD><CAB5>Ӧ<EFBFBD><D3A6><EFBFBD>п<EFBFBD><D0BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
@@ -147,11 +195,19 @@ int main() {
free(index); free(index);
} }
} }
else if (i == 1) {
// <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) {
printf("Failed to create message processor thread %d\n", i);
free(index);
}
}
else { else {
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD> // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>գ<EFBFBD>ʵ<EFBFBD><CAB5>Ӧ<EFBFBD><D3A6><EFBFBD>п<EFBFBD><D0BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD> // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>գ<EFBFBD>ʵ<EFBFBD><CAB5>Ӧ<EFBFBD><D3A6><EFBFBD>п<EFBFBD><D0BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
free(index); free(index);
} }
} }
printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS); printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS);
@@ -179,6 +235,13 @@ int main() {
pthread_mutex_unlock(&thread_info[i].lock); pthread_mutex_unlock(&thread_info[i].lock);
} }
} }
// <20><><EFBFBD><EFBFBD>socket<65><74><EFBFBD><EFBFBD>״̬
static int queue_monitor = 0;
if (++queue_monitor >= 10) { // ÿ10<31><EFBFBD><EBB1A8>һ<EFBFBD><D2BB>
printf("Message queue size: %zu\n", message_queue.size());
queue_monitor = 0;
}
} }
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Դ(<28><><EFBFBD><EFBFBD><EFBFBD>ϲ<EFBFBD><CFB2><EFBFBD>ִ<EFBFBD>е<EFBFBD><D0B5><EFBFBD><EFBFBD><EFBFBD>) // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Դ(<28><><EFBFBD><EFBFBD><EFBFBD>ϲ<EFBFBD><CFB2><EFBFBD>ִ<EFBFBD>е<EFBFBD><D0B5><EFBFBD><EFBFBD><EFBFBD>)