Files
front_linux/Linux_Hello/client2.cpp
2025-06-13 11:29:59 +08:00

314 lines
9.7 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#include <pthread.h>
#include <unistd.h>
#include <math.h>
#include "PQSMsg.h"
#include "client2.h"
// 配置参数
#define INITIAL_DATA_SIZE 128 // 初始数据0.1KB
#define CONNECTIONS 2 // 并发连接数 设备连接数
#define SERVER_IP "101.132.39.45" // 目标服务器IP 阿里云服务器 "101.132.39.45""172.27.208.1"
#define SERVER_PORT 1056 // 目标服务器端口
#define BASE_RECONNECT_DELAY 5000 // 基础重连延迟(ms)
#define MAX_RECONNECT_DELAY 60000 // 最大重连延迟(ms)
#define MAX_RECONNECT_ATTEMPTS 10 // 最大重连次数
/* 缓冲区分配回调 */
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
void* buffer = malloc(suggested_size);
if (!buffer) {
fprintf(stderr, "Memory allocation failed\n");
*buf = uv_buf_init(NULL, 0);
return;
}
*buf = uv_buf_init((char*)buffer, suggested_size);
}
/* 数据读取回调 */
void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
client_context_t* ctx = (client_context_t*)stream->data;
if (nread < 0) {
if (nread != UV_EOF) {
fprintf(stdout, "[Client %d] RECV ERROR: %s\n",
ctx->index, uv_strerror(nread));
}
else {
fprintf(stdout, "[Client %d] Connection closed by server\n", ctx->index);
}
uv_close((uv_handle_t*)stream, on_close);
free(buf->base);
}
else if (nread > 0) {
fprintf(stdout, "[Client %d] RECV %zd bytes data\n", ctx->index, nread);
//数据传入客户端缓冲区并尝试取出合法报文格式的数据
free(buf->base);
}
}
/* 数据写入回调 */
void on_write(uv_write_t* req, int status) {
client_context_t* ctx = (client_context_t*)req->handle->data;
if (status < 0) {
fprintf(stdout, "[Client %d] SEND ERROR: %s\n",
ctx->index, uv_strerror(status));
}
else {
fprintf(stdout, "[Client %d] SEND bytes success\n",
ctx->index);
}
free(req->data);
free(req);
}
/* 定时组装二进制报文并发送 */
void on_timer(uv_timer_t* handle) {
client_context_t* ctx = (client_context_t*)handle->data;
if (ctx->state != STATE_CONNECTED) {
fprintf(stdout, "[Client %d] Skip sending: Not connected\n", ctx->index);
return;
}
fprintf(stdout, "[Client %d] Preparing periodic data...\n", ctx->index);
// 生成完整报文 装置云服务登录报文
auto binary_data = generate_frontlogin_message("00-B7-8D-A8-00-D6");
// 转换为数组形式
unsigned char* binary_array = binary_data.data();
size_t data_size = binary_data.size();
// 此处可调用发送函数
send_binary_data(ctx, binary_array, data_size);
}
/* 发送初始数据块 - 修复版 */
void send_initial_data(client_context_t* ctx) {
// 添加状态检查
if (ctx->state != STATE_CONNECTED) {
fprintf(stderr, "[Client %d] Cannot send initial data: not connected\n", ctx->index);
return;
}
// 修正为实际数据大小 (1KB)
const size_t data_size = INITIAL_DATA_SIZE; // 使用实际值替换 INITIAL_DATA_SIZE
char* buffer = (char*)malloc(data_size);
if (!buffer) {
fprintf(stderr, "[Client %d] Failed to allocate initial data buffer\n", ctx->index);
return;
}
memset(buffer, 'A', data_size);
buffer[0] = 0xeb; buffer[1] = 0x90; buffer[2] = 0xff; buffer[15] = 0x16;
fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size);
uv_buf_t buf = uv_buf_init(buffer, data_size);
uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
if (!write_req) {
free(buffer);
fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index);
return;
}
write_req->data = buffer;
// 检查uv_write返回值
int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write);
if (ret < 0) {
fprintf(stderr, "[Client %d] uv_write failed: %s\n",
ctx->index, uv_strerror(ret));
free(buffer);
free(write_req);
}
}
/* 发送二进制报文函数 */
void send_binary_data(client_context_t* ctx, const unsigned char* data, size_t data_size) {
if (ctx->state != STATE_CONNECTED) {
fprintf(stderr, "[Client %d] Cannot send binary data: not connected\n", ctx->index);
return;
}
uv_buf_t buf = uv_buf_init((char*)data, data_size);
uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
if (!write_req) {
fprintf(stderr, "[Client %d] Failed to allocate write request\n", ctx->index);
return;
}
write_req->data = NULL; // 不需要额外数据因为data已经传入
fprintf(stdout, "[Client %d] Sending initial %zu bytes data\n", ctx->index, data_size);
int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write);
if (ret < 0) {
fprintf(stderr, "[Client %d] uv_write failed: %s\n", ctx->index, uv_strerror(ret));
free(write_req);
}
// 注意这里不需要释放data因为data是由调用者管理的
}
/* 连接关闭回调 */
void on_close(uv_handle_t* handle) {
client_context_t* ctx = (client_context_t*)handle->data;
ctx->state = STATE_DISCONNECTED;
fprintf(stdout, "[Client %d] Connection closed\n", ctx->index);
// 停止数据读写定时器和重连定时器
uv_timer_stop(&ctx->timer);
uv_timer_stop(&ctx->reconnect_timer);
// 取消读操作(如果有的话)
uv_read_stop((uv_stream_t*)&ctx->client);
// 指数退避算法
int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts);
delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay;
fprintf(stdout, "[Client %d] Will reconnect after %dms (attempt %d/%d)\n",
ctx->index, delay, ctx->reconnect_attempts + 1, MAX_RECONNECT_ATTEMPTS);
uv_timer_start(&ctx->reconnect_timer, (uv_timer_cb)try_reconnect, delay, 0);
ctx->reconnect_attempts++;
}
/* 尝试重连函数 */
void try_reconnect(uv_timer_t* timer) {
client_context_t* ctx = (client_context_t*)timer->data;
if (ctx->state != STATE_DISCONNECTED) {
fprintf(stdout, "[Client %d] Reconnect skipped: Invalid state\n", ctx->index);
return;
}
fprintf(stdout, "[Client %d] Attempting to reconnect...\n", ctx->index);
if (ctx->reconnect_attempts >= MAX_RECONNECT_ATTEMPTS) {
fprintf(stderr, "[Client %d] Max reconnect attempts reached\n", ctx->index);
return;
}
// 重新初始化TCP句柄(重要!)
uv_tcp_init(ctx->loop, &ctx->client);
ctx->client.data = ctx;
ctx->state = STATE_CONNECTING;
struct sockaddr_in addr;
uv_ip4_addr(SERVER_IP, SERVER_PORT, &addr);
uv_connect_t* req = (uv_connect_t*)malloc(sizeof(uv_connect_t));
req->data = ctx;
int ret = uv_tcp_connect(req, &ctx->client, (const struct sockaddr*)&addr, on_connect);
if (ret < 0) {
fprintf(stderr, "[Client %d] Connect error: %s\n", ctx->index, uv_strerror(ret));
ctx->state = STATE_DISCONNECTED;
free(req);
uv_close((uv_handle_t*)&ctx->client, on_close);
}
}
/* 连接建立回调 */
void on_connect(uv_connect_t* req, int status) {
client_context_t* ctx = (client_context_t*)req->data;
if (ctx->state != STATE_CONNECTING) {
fprintf(stdout, "[Client %d] Connection callback with invalid state\n", ctx->index);
free(req);
return;
}
if (status < 0) {
fprintf(stderr, "[Client %d] Connect failed: %s\n", ctx->index, uv_strerror(status));
ctx->state = STATE_DISCONNECTED;
free(req);
uv_close((uv_handle_t*)&ctx->client, on_close);
return;
}
else {
ctx->state = STATE_CONNECTED;
ctx->reconnect_attempts = 0;
fprintf(stdout, "[Client %d] Connection established\n", ctx->index);
// 启动数据收发
uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read);
//send_initial_data(ctx);
// 启动定时器
uv_timer_start(&ctx->timer, on_timer, 6000, 6000);
}
free(req);
}
/* 客户端线程函数 */
void* run_client(void* arg) {
int index = *(int*)arg;
free(arg);
// 初始化事件循环
uv_loop_t* loop = uv_loop_new();
client_context_t* ctx = (client_context_t*)malloc(sizeof(client_context_t));
memset(ctx, 0, sizeof(client_context_t));
// 初始化上下文
ctx->loop = loop;
ctx->index = index;
ctx->state = STATE_DISCONNECTED;
ctx->reconnect_attempts = 0;
// 初始化libuv句柄
uv_tcp_init(loop, &ctx->client);
uv_timer_init(loop, &ctx->timer);
uv_timer_init(loop, &ctx->reconnect_timer);
// 设置关联数据
ctx->client.data = ctx;
ctx->timer.data = ctx;
ctx->reconnect_timer.data = ctx;
// 首次连接尝试
try_reconnect(&ctx->reconnect_timer);
// 运行事件循环
uv_run(loop, UV_RUN_DEFAULT);
// 资源清理
uv_loop_close(loop);
free(loop);
free(ctx);
return NULL;
}
/* 主函数 */
int main() {
pthread_t threads[CONNECTIONS];
// 创建客户端线程
for (int i = 0; i < CONNECTIONS; i++) {
int* index = (int*)malloc(sizeof(int));
*index = i;
if (pthread_create(&threads[i], NULL, run_client, index) != 0) {
fprintf(stderr, "Failed to create thread %d\n", i);
free(index);
}
}
printf("Started %d client connections\n", CONNECTIONS);
while (1) {
printf("sleep 1\n");
sleep(10);
}
// 等待所有线程结束
for (int i = 0; i < CONNECTIONS; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}