新增了装置通讯管理,现在可以在外部动态的添加和删除连接设备了,同时添加了设备的消息发送功能。

This commit is contained in:
zw
2025-06-24 18:40:10 +08:00
parent 1fd504add4
commit ccd7a3bb59
3 changed files with 154 additions and 33 deletions

View File

@@ -7,15 +7,16 @@
#include <cstdlib>
#include <vector>
#include <memory>
#include <mutex>
#include <unordered_map>
// 配置参数
constexpr int BASE_RECONNECT_DELAY = 5000; // 基础重连延迟(ms)
constexpr int MAX_RECONNECT_DELAY = 60000; // 最大重连延迟(ms)
constexpr const char* SERVER_IP = "101.132.39.45"; // 目标服务器IP
constexpr int SERVER_PORT = 1056; // 目标服务器端口
constexpr const char* SERVER_IP = "172.25.144.1"; // 目标服务器IP"101.132.39.45"
constexpr int SERVER_PORT = 61000; // 目标服务器端口1056
static uv_loop_t* global_loop = nullptr;
static std::vector<std::unique_ptr<ClientContext>> client_contexts;
static uv_timer_t monitor_timer;
extern SafeMessageQueue message_queue;
@@ -107,15 +108,16 @@ void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
// 复制测点信息
msg.points = ctx->device_info.points;
msg.data = new char[nread];
msg.data = static_cast<char*>(malloc(nread));
msg.length = nread;
memcpy(msg.data, buf->base, nread);
if (!message_queue.push(msg)) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] Message queue full, dropping message" << std::endl;
delete[] msg.data;
free(msg.data);
}
std::cout << "on_read: " << ctx->device_info.mac << " get!" << std::endl;
}
delete[] buf->base;
@@ -129,7 +131,7 @@ void on_write(uv_write_t* req, int status) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] SEND ERROR: " << uv_strerror(status) << std::endl;
}
std::cout << "on_write: " << ctx->device_info.mac << " down!" << std::endl;
delete[] static_cast<char*>(req->data); // 释放发送数据缓冲区
delete req; // 释放写入请求
}
@@ -141,13 +143,15 @@ void on_timer(uv_timer_t* handle) {
if (ctx->state != ConnectionState::CONNECTED) {
return;
}
std::cout << "on_timer: " << ctx->device_info.mac << " send!"<< std::endl;
// 使用装置自己的MAC地址生成登录报文
auto binary_data = generate_frontlogin_message(ctx->device_info.mac);
// 调用发送函数
send_binary_data(ctx, binary_data.data(), binary_data.size());
//ClientManager::instance().send_to_device(ctx->device_info.mac, binary_data.data(), binary_data.size());
// 根据装置状态发送其他数据
if (ctx->device_info.status == 1) { // 在线状态
// 可以发送装置配置信息或测点数据
@@ -253,25 +257,18 @@ void on_connect(uv_connect_t* req, int status) {
/* 初始化所有客户端连接 */
void init_clients(uv_loop_t* loop, const std::vector<DeviceInfo>& devices) {
client_contexts.clear();
for (size_t i = 0; i < devices.size(); i++) {
// 修改为C++11兼容的unique_ptr创建方式
client_contexts.push_back(
std::unique_ptr<ClientContext>(
new ClientContext(loop, devices[i], i)
)
);
try_reconnect(&client_contexts.back()->reconnect_timer);
auto& manager = ClientManager::instance();
manager.set_loop(loop); // 使用公共方法设置事件循环
for (const auto& device : devices) {
manager.add_device(device);
}
}
/* 停止所有客户端 */
void stop_all_clients() {
for (auto& ctx : client_contexts) {
ctx->shutdown = true;
ctx->close_handles();
}
client_contexts.clear();
auto& manager = ClientManager::instance();
manager.stop_all();
}
/* 连接监控回调 */
@@ -279,12 +276,12 @@ void monitor_connections(uv_timer_t* handle) {
static int recovery_counter = 0;
if (++recovery_counter >= 5) {
int active_count = 0;
for (const auto& ctx : client_contexts) {
if (ctx->state == ConnectionState::CONNECTED) {
active_count++;
}
}
std::cout << "Active connections: " << active_count << "/" << client_contexts.size() << std::endl;
auto& manager = ClientManager::instance();
size_t total_clients = manager.client_count();
// 实际应用中,这里需要实现获取活动连接数的方法
// 简化处理,只显示总连接数
std::cout << "Total connections: " << total_clients << std::endl;
recovery_counter = 0;
}
}
@@ -328,3 +325,79 @@ void start_client_connect(const std::vector<DeviceInfo>& devices) {
stop_all_clients();
global_loop = nullptr;
}
// ClientManager 成员函数实现
void ClientManager::add_device(const DeviceInfo& device) {
std::lock_guard<std::mutex> lock(mutex_);
if (!loop_) {
std::cerr << "[Device " << device.device_id << "] Cannot add: event loop not set\n";
return;
}
// 检查是否已存在相同ID的装置
if (clients_.find(device.device_id) != clients_.end()) {
std::cerr << "[Device " << device.device_id << "] Already exists, skip adding\n";
return;
}
// 创建新的客户端上下文
int index = clients_.size();
auto ctx = std::unique_ptr<ClientContext>(new ClientContext(loop_, device, index));
// 添加到管理映射
clients_[device.device_id] = std::move(ctx);
// 启动连接
try_reconnect(&clients_[device.device_id]->reconnect_timer);
std::cout << "[Device " << device.device_id << "] Added successfully\n";
}
void ClientManager::remove_device(const std::string& device_id) {
std::lock_guard<std::mutex> lock(mutex_);
auto it = clients_.find(device_id);
if (it == clients_.end()) {
std::cerr << "[Device " << device_id << "] Not found, cannot remove\n";
return;
}
// 关闭连接并移除
it->second->shutdown = true;
it->second->close_handles();
clients_.erase(it);
std::cout << "[Device " << device_id << "] Removed successfully\n";
}
bool ClientManager::send_to_device(const std::string& identifier,
const unsigned char* data,
size_t size) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
// 匹配装置ID或MAC地址
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
if (ctx->state == ConnectionState::CONNECTED) {
send_binary_data(ctx.get(), data, size);
return true;
}
std::cerr << "[Device " << identifier << "] Not connected\n";
return false;
}
}
std::cerr << "[Device " << identifier << "] Not found\n";
return false;
}
void ClientManager::stop_all() {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
pair.second->shutdown = true;
pair.second->close_handles();
}
clients_.clear();
}

View File

@@ -2,6 +2,8 @@
#include <string>
#include <vector>
#include <memory>
#include <unordered_map>
#include <mutex>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD>
struct PointInfo {
@@ -54,10 +56,42 @@ private:
int index_;
};
class ClientManager {
public:
static ClientManager& instance() {
static ClientManager inst;
return inst;
}
// <20><><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>ѭ<EFBFBD><D1AD>
void set_loop(uv_loop_t* loop) {
std::lock_guard<std::mutex> lock(mutex_);
loop_ = loop;
}
void add_device(const DeviceInfo& device);
void remove_device(const std::string& device_id);
bool send_to_device(const std::string& identifier, const unsigned char* data, size_t size);
void stop_all();
// <20><>ȡ<EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
size_t client_count() {
std::lock_guard<std::mutex> lock(mutex_);
return clients_.size();
}
private:
ClientManager() : loop_(nullptr) {}
std::unordered_map<std::string, std::unique_ptr<ClientContext>> clients_;
std::mutex mutex_;
uv_loop_t* loop_; // <20>¼<EFBFBD>ѭ<EFBFBD><D1AD>ָ<EFBFBD><D6B8>
};
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
void start_client_connect(const std::vector<DeviceInfo>& devices);
void send_binary_data(ClientContext* ctx, const unsigned char* data, size_t data_size);
void on_timer(uv_timer_t* handle);//װ<>ö<EFBFBD>ʱ<EFBFBD>ص<EFBFBD>
void try_reconnect(uv_timer_t* timer);//װ<><D7B0><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD>
void on_connect(uv_connect_t* req, int status);//װ<><D7B0><EFBFBD><EFBFBD><EFBFBD>ӻص<D3BB>
void on_close(uv_handle_t* handle);//װ<>ùرջص<D5BB>
void on_timer(uv_timer_t* handle);
void try_reconnect(uv_timer_t* timer);
void on_connect(uv_connect_t* req, int status);
void on_close(uv_handle_t* handle);
void init_clients(uv_loop_t* loop, const std::vector<DeviceInfo>& devices);
void stop_all_clients();

View File

@@ -69,7 +69,7 @@ std::vector<DeviceInfo> generate_test_devices(int count) {
dev_id,
dev_name,
(i % 2 == 0) ? "Model-X" : "Model-Y", // <20><><EFBFBD><EFBFBD>ʹ<EFBFBD><CAB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͺ<EFBFBD>
"00-B7-8D-A8-00-D1", // <20><><EFBFBD><EFBFBD>MAC<41><43>ַ
"00-B7-8D-A8-00-D6", // <20><><EFBFBD><EFBFBD>MAC<41><43>ַ
1, // ״̬ (1=<3D><><EFBFBD><EFBFBD>)
points
});
@@ -185,6 +185,10 @@ void* message_processor_thread(void* arg) {
free(msg.data);
}
else {
// <20><><EFBFBD><EFBFBD>Ϊ<EFBFBD><CEAA>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ߣ<EFBFBD>100΢<30><CEA2> = 0.1<EFBFBD><EFBFBD><EFBFBD>
usleep(100);
}
}
// <20>߳<EFBFBD><DFB3><EFBFBD>ֹ<EFBFBD><D6B9><EFBFBD><EFBFBD>
@@ -307,9 +311,19 @@ int main() {
// <20><><EFBFBD><EFBFBD>socket<65><74><EFBFBD><EFBFBD>״̬
static int queue_monitor = 0;
//static int count = 3;
if (++queue_monitor >= 10) { // ÿ10<31><EFBFBD><EBB1A8>һ<EFBFBD><D2BB>
printf("Message queue size: %zu\n", message_queue.size());
queue_monitor = 0;
/*std::vector<DeviceInfo> test_devices = generate_test_devices(count);
count++;
for (const auto& device : test_devices) {
ClientManager::instance().add_device(device);
}
for (const auto& device : test_devices) {
ClientManager::instance().remove_device("D001");
}*/
}
}