Files
front_linux/LFtid1056/client2.cpp
2025-06-19 13:24:45 +08:00

282 lines
8.6 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#include <math.h>
#include "PQSMsg.h"
#include "client2.h"
// 配置参数
#define INITIAL_DATA_SIZE 128 // 初始数据0.1KB
#define CONNECTIONS 1000 // 支持1000个并发连接
#define SERVER_IP "101.132.39.45" // 目标服务器IP "101.132.39.45"
#define SERVER_PORT 1056 // 目标服务器端口
#define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms)
#define MAX_RECONNECT_DELAY 60000 // 最大重连延迟(ms)
static uv_loop_t* global_loop; // 全局事件循环
static client_context_t client_contexts[CONNECTIONS]; // 客户端上下文数组
static uv_timer_t monitor_timer; // 连接监控定时器
/* 缓冲区分配回调 */
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
void* buffer = malloc(suggested_size);
if (!buffer) {
*buf = uv_buf_init(NULL, 0);
return;
}
*buf = uv_buf_init((char*)buffer, suggested_size);
}
/* 数据读取回调 */
void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
client_context_t* ctx = (client_context_t*)stream->data;
if (nread < 0) {
if (nread != UV_EOF) {
fprintf(stdout, "[Client %d] RECV ERROR: %s\n",
ctx->index, uv_strerror(nread));
}
uv_close((uv_handle_t*)stream, on_close);
free(buf->base);
return;
}
if (nread > 0) {
//fprintf(stdout, "[Client %d] RECV %zd bytes data\n", ctx->index, nread);
// 数据处理逻辑...
}
free(buf->base);
}
/* 数据写入回调 */
void on_write(uv_write_t* req, int status) {
client_context_t* ctx = (client_context_t*)req->handle->data;
if (status < 0) {
fprintf(stdout, "[Client %d] SEND ERROR: %s\n",
ctx->index, uv_strerror(status));
}
free(req->data); // 释放发送数据缓冲区
free(req); // 释放写入请求
}
/* 定时发送回调 */
void on_timer(uv_timer_t* handle) {
client_context_t* ctx = (client_context_t*)handle->data;
if (ctx->state != STATE_CONNECTED) {
return;
}
// 生成完整报文 装置云服务登录报文
auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6");
// 转换为数组形式
unsigned char* binary_array = binary_data.data();
size_t data_size = binary_data.size();
// 此处可调用发送函数
send_binary_data(ctx, binary_array, data_size);
}
/* 发送二进制报文函数 */
void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size) {
if (ctx->state != STATE_CONNECTED) {
fprintf(stderr, "[Client %d] Cannot send binary data: not connected\n", ctx->index);
return;
}
uv_buf_t buf = uv_buf_init((char*)data, data_size);
uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
if (!write_req) {
fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index);
return;
}
write_req->data = NULL; // 不需要额外数据因为data已经传入
//fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size);
int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write);
if (ret < 0) {
fprintf(stderr, "[Client %d] uv_write failed: %s\n", ctx->index, uv_strerror(ret));
free(write_req);
}
// 注意这里不需要释放data因为data是由调用者管理的
}
/* 连接关闭回调 */
void on_close(uv_handle_t* handle) {
client_context_t* ctx = (client_context_t*)handle->data;
ctx->state = STATE_DISCONNECTED;
// 停止定时器
uv_timer_stop(&ctx->timer);
uv_timer_stop(&ctx->reconnect_timer);
// 自动重连逻辑
if (!ctx->shutdown) {
int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts);
delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay;
fprintf(stdout, "[Client %d] Reconnecting in %dms (attempt %d)\n",
ctx->index, delay, ctx->reconnect_attempts + 1);
ctx->reconnect_attempts++;
uv_timer_start(&ctx->reconnect_timer, try_reconnect, delay, 0);
}
}
/* 尝试重连 */
void try_reconnect(uv_timer_t* timer) {
client_context_t* ctx = (client_context_t*)timer->data;
if (ctx->state != STATE_DISCONNECTED || ctx->shutdown) {
return;
}
// 重新初始化TCP句柄
uv_tcp_init(ctx->loop, &ctx->client);
ctx->client.data = ctx;
ctx->state = STATE_CONNECTING;
struct sockaddr_in addr;
uv_ip4_addr(SERVER_IP, SERVER_PORT, &addr);
uv_connect_t* req = (uv_connect_t*)malloc(sizeof(uv_connect_t));
req->data = ctx;
int ret = uv_tcp_connect(req, &ctx->client, (const struct sockaddr*)&addr, on_connect);
if (ret < 0) {
fprintf(stderr, "[Client %d] Connect error: %s\n", ctx->index, uv_strerror(ret));
free(req);
uv_close((uv_handle_t*)&ctx->client, on_close);
}
}
/* 连接建立回调 */
void on_connect(uv_connect_t* req, int status) {
client_context_t* ctx = (client_context_t*)req->data;
if (status < 0) {
fprintf(stderr, "[Client %d] Connect failed: %s\n", ctx->index, uv_strerror(status));
// 直接关闭句柄,避免后续重复关闭
if (!uv_is_closing((uv_handle_t*)&ctx->client)) {
uv_close((uv_handle_t*)&ctx->client, NULL);
}
free(req);
return;
}
ctx->state = STATE_CONNECTED;
ctx->reconnect_attempts = 0;
// 启动数据接收
uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read);
// 启动定时发送
uv_timer_start(&ctx->timer, on_timer, 6000, 6000);
free(req);
}
/* 初始化所有客户端连接 */
void init_clients(uv_loop_t* loop) {
for (int i = 0; i < CONNECTIONS; i++) {
client_context_t* ctx = &client_contexts[i];
memset(ctx, 0, sizeof(client_context_t));
ctx->loop = loop;
ctx->index = i;
ctx->state = STATE_DISCONNECTED;
// 初始化TCP句柄
uv_tcp_init(loop, &ctx->client);
ctx->client.data = ctx;
// 初始化定时器
uv_timer_init(loop, &ctx->timer);
ctx->timer.data = ctx;
// 初始化重连定时器
uv_timer_init(loop, &ctx->reconnect_timer);
ctx->reconnect_timer.data = ctx;
// 首次连接
try_reconnect(&ctx->reconnect_timer);
}
}
/* 停止所有客户端 */
void stop_all_clients() {
for (int i = 0; i < CONNECTIONS; i++) {
client_context_t* ctx = &client_contexts[i];
ctx->shutdown = 1;
// 关闭所有句柄
if (!uv_is_closing((uv_handle_t*)&ctx->client)) {
uv_close((uv_handle_t*)&ctx->client, NULL);
}
if (!uv_is_closing((uv_handle_t*)&ctx->timer)) {
uv_close((uv_handle_t*)&ctx->timer, NULL);
}
if (!uv_is_closing((uv_handle_t*)&ctx->reconnect_timer)) {
uv_close((uv_handle_t*)&ctx->reconnect_timer, NULL);
}
}
}
/* 连接监控回调 */
void monitor_connections(uv_timer_t* handle) {
static int shutdown_flag = 0; // 新增关闭标志
if (shutdown_flag) return; // 已关闭则不再处理
// 自动恢复断开的连接
static int recovery_counter = 0;
if (++recovery_counter >= 5) { // 每5次监控执行一次恢复
int active_count = 0;
for (int i = 0; i < CONNECTIONS; i++) {
if (client_contexts[i].state == STATE_CONNECTED) {
active_count++;
}
}
printf("Active connections: %d/%d\n", active_count, CONNECTIONS);
recovery_counter = 0;
}
//static int monitor_temp = 0;
//monitor_temp++;
//if (monitor_temp >= 30) {
// monitor_temp = 0;
// printf("30 second to stop all client\n");
// // 设置关闭标志
// shutdown_flag = 1;
//
// // 停止并关闭监控定时器
// uv_timer_stop(handle);
// uv_close((uv_handle_t*)handle, NULL);
//
// // 停止所有客户端
// stop_all_clients();
//
// // 停止事件循环
// uv_stop(global_loop);
//}
}
void start_client_connect() {
// 创建全局事件循环
global_loop = uv_default_loop();
// 初始化所有客户端
init_clients(global_loop);
// 启动连接监控
uv_timer_init(global_loop, &monitor_timer);
uv_timer_start(&monitor_timer, monitor_connections, 1000, 1000);
// 运行事件循环
uv_run(global_loop, UV_RUN_DEFAULT);
// 清理资源(关键:确保循环完全停止)
uv_loop_close(global_loop);
global_loop = NULL;
}