Files
front_linux/LFtid1056/client2.cpp

1235 lines
40 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "client2.h"
#include <iostream>
#include <cmath>
#include <cstring>
#include <cstdlib>
#include <vector>
#include <memory>
#include <mutex>
#include <unordered_map>
// 配置参数
constexpr int BASE_RECONNECT_DELAY = 20000; // 基础重连延迟(ms)
constexpr int MAX_RECONNECT_DELAY = 60000; // 最大重连延迟(ms)
constexpr const char* SERVER_IP = "101.132.39.45"; // 目标服务器IP"101.132.39.45"
constexpr int SERVER_PORT = 1056; // 目标服务器端口1056
static uv_loop_t* global_loop = nullptr;
static uv_timer_t monitor_timer;
extern SafeMessageQueue message_queue;
// ClientContext 实现
ClientContext::ClientContext(uv_loop_t* loop, const DeviceInfo& device, int index)
: loop(loop), state(ConnectionState::DISCONNECTED),
reconnect_attempts(0), shutdown(false), device_info(device), index_(index),cloudstatus(0), current_state_(DeviceState::IDLE),
state_start_time_(0) {
recv_buffer_.reserve(4096); // 预分配4KB缓冲区
// 初始化 TCP 句柄
uv_tcp_init(loop, &client);
client.data = this;
// 初始化定时器
uv_timer_init(loop, &timer);
timer.data = this;
// 初始化重连定时器
uv_timer_init(loop, &reconnect_timer);
reconnect_timer.data = this;
}
ClientContext::~ClientContext() {
stop_timers();
close_handles();
}
void ClientContext::init_tcp() {
if (!uv_is_active((uv_handle_t*)&client)) {
uv_tcp_init(loop, &client);
client.data = this;
}
}
void ClientContext::start_timer() {
if (!uv_is_active((uv_handle_t*)&timer)) {
uv_timer_start(&timer, on_timer, 1000,1000);
}
}
void ClientContext::start_reconnect_timer(int delay) {
if (!uv_is_active((uv_handle_t*)&reconnect_timer)) {
uv_timer_start(&reconnect_timer, try_reconnect, delay, 0);
}
}
void ClientContext::stop_timers() {
if (uv_is_active((uv_handle_t*)&timer)) uv_timer_stop(&timer);
if (uv_is_active((uv_handle_t*)&reconnect_timer)) uv_timer_stop(&reconnect_timer);
}
void ClientContext::close_handles() {
if (!uv_is_closing((uv_handle_t*)&client)) {
uv_close((uv_handle_t*)&client, nullptr);
}
if (!uv_is_closing((uv_handle_t*)&timer)) {
uv_close((uv_handle_t*)&timer, nullptr);
}
if (!uv_is_closing((uv_handle_t*)&reconnect_timer)) {
uv_close((uv_handle_t*)&reconnect_timer, nullptr);
}
}
// 添加接收数据到缓冲区并处理
void ClientContext::append_and_process_data(const char* data, size_t len) {
std::lock_guard<std::mutex> lock(buffer_mutex_);
// 添加到缓冲区
recv_buffer_.insert(recv_buffer_.end(), data, data + len);
// 处理缓冲区数据
process_buffer();
// 检查缓冲区大小防止内存溢出
if (recv_buffer_.size() > 10 * 1024 * 1024) { // 10MB限制
recv_buffer_.clear();
std::cerr << "[Device " << device_info.device_id
<< "] Buffer overflow, cleared\n";
}
}
// 注意:这个函数必须在 buffer_mutex_ 已被锁定的情况下调用
void ClientContext::process_buffer() {
constexpr int MSG_HEAD_LEN = 6; // 最小包头长度
while (true) {
// 检查缓冲区大小是否足够解析包头
if (recv_buffer_.size() < MSG_HEAD_LEN)
break;
// 云服务器状态报文检查 (EB 90 EB 90)
if (recv_buffer_.size() >= 4 &&
recv_buffer_[0] == 0xEB && recv_buffer_[1] == 0x90 &&
recv_buffer_[2] == 0xEB && recv_buffer_[3] == 0x90) {
const int packageLen = 150; // 固定长度
if (recv_buffer_.size() < packageLen)
break;
// 提取完整报文
std::vector<unsigned char> packet(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 从缓冲区移除已处理数据
recv_buffer_.erase(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 放入消息队列
put_packet_into_queue(packet);
continue;
}
// 标准报文检查 (EB 90)
if (recv_buffer_[0] != 0xEB || recv_buffer_[1] != 0x90) {
// 非法包头,清空缓冲区
recv_buffer_.clear();
break;
}
// 解析包长度 (小端序)
if (recv_buffer_.size() < 6)
break;
uint16_t body_len = (recv_buffer_[4] << 8) | recv_buffer_[5];
const int packageLen = body_len + 10; // 基础长度+扩展
if (recv_buffer_.size() < packageLen)
break;
// 提取完整报文
std::vector<unsigned char> packet(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 从缓冲区移除已处理数据
recv_buffer_.erase(
recv_buffer_.begin(),
recv_buffer_.begin() + packageLen
);
// 放入消息队列
put_packet_into_queue(packet);
}
}
void ClientContext::put_packet_into_queue(
const std::vector<unsigned char>& packet)
{
deal_message_t msg;
msg.device_id = device_info.device_id;
msg.mac = device_info.mac;
msg.points = device_info.points;
msg.length = packet.size();
msg.data = static_cast<char*>(malloc(msg.length));
memcpy(msg.data, packet.data(), msg.length);
if (!message_queue.push(msg)) {
free(msg.data);
std::cerr << "[Device " << device_info.device_id
<< "] Queue full, dropping packet\n";
}
}
// 新增方法:改变装置状态
void ClientContext::change_state(DeviceState new_state, const std::vector<unsigned char>& packet) {
std::lock_guard<std::mutex> lock(state_mutex_);
// 直接更新状态,不调用其他锁方法
current_state_ = new_state;
current_packet_ = packet;
state_start_time_ = uv_now(loop);
std::cout << "[Device " << device_info.device_id
<< "] State changed to: " << static_cast<int>(new_state) << std::endl;
}
// 新增方法:添加后续动作
void ClientContext::add_action(DeviceState state, const std::vector<unsigned char>& packet) {
std::lock_guard<std::mutex> lock(state_mutex_);
action_queue_.push({ state, packet });
std::cout << "[Device " << device_info.device_id
<< "] Action added to queue: " << static_cast<int>(state) << std::endl;
}
// 新增方法:处理状态超时
void ClientContext::check_state_timeout() {
constexpr uint64_t STATE_TIMEOUT = 30000;//30秒超时
uint64_t now = uv_now(loop);
bool timed_out = false;
{
std::lock_guard<std::mutex> lock(state_mutex_);
if (current_state_ != DeviceState::IDLE &&
(now - state_start_time_) > STATE_TIMEOUT)
{
timed_out = true;
current_state_ = DeviceState::IDLE;
}
}
if (timed_out) {
process_next_action(); // 在锁外调用
}
}
// 新增方法:处理下一个动作
void ClientContext::process_next_action() {
StateAction next;
{
std::lock_guard<std::mutex> lock(state_mutex_);
if (current_state_ != DeviceState::IDLE || action_queue_.empty())
return;
next = action_queue_.front();
action_queue_.pop();
} // 提前释放锁
// 在锁外调用可能阻塞的函数
change_state(next.state, next.packet);
send_current_packet();
}
// 新增方法:发送当前状态对应的报文
void ClientContext::send_current_packet() {
if (!current_packet_.empty()) {
safe_send_binary_data(this, current_packet_);
}
}
bool ClientContext::add_stat_packet(const std::vector<unsigned char>& packet, int current_packet, int total_packets) {
std::lock_guard<std::mutex> lock(stat_cache_mutex_);
// 如果是第一帧,初始化缓存
if (current_packet == 1) {
stat_packets_cache_.clear();
expected_total_packets_ = total_packets;
}
// 添加到缓存
stat_packets_cache_.push_back({ current_packet, packet });
// 检查是否收齐所有帧
return (stat_packets_cache_.size() >= expected_total_packets_);
}
std::vector<ClientContext::StatPacket> ClientContext::get_and_clear_stat_packets() {
std::lock_guard<std::mutex> lock(stat_cache_mutex_);
auto packets = std::move(stat_packets_cache_);
stat_packets_cache_.clear();
expected_total_packets_ = 0;
return packets;
}
void ClientContext::clear_stat_cache() {
std::lock_guard<std::mutex> lock(stat_cache_mutex_);
stat_packets_cache_.clear();
expected_total_packets_ = 0;
}
// 添加浮点数据到缓存
bool ClientContext::add_float_data(ushort point_id, int data_type, const tagPqData_Float& float_data) {
if (data_type < 0 || data_type > 3) return false;
std::lock_guard<std::mutex> lock(float_cache_mutex_);
auto& cache = point_float_cache_[point_id];
cache.data[data_type] = float_data;
cache.received[data_type] = true;
// 检查是否四种数据类型都已接收
return cache.received[0] && cache.received[1] &&
cache.received[2] && cache.received[3];
}
// 获取并清除指定测点的完整浮点数据
std::array<tagPqData_Float, 4> ClientContext::get_and_clear_float_data(ushort point_id) {
std::lock_guard<std::mutex> lock(float_cache_mutex_);
auto it = point_float_cache_.find(point_id);
if (it == point_float_cache_.end()) {
return {};
}
auto data = it->second.data;
point_float_cache_.erase(it);
return data;
}
// 清除所有浮点数据缓存
void ClientContext::clear_float_cache() {
std::lock_guard<std::mutex> lock(float_cache_mutex_);
point_float_cache_.clear();
}
/* 缓冲区分配回调 */
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = new char[suggested_size];
buf->len = suggested_size;
}
/* 数据读取回调 */
void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
ClientContext* ctx = static_cast<ClientContext*>(stream->data);
if (nread < 0) {
if (nread != UV_EOF) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] RECV ERROR: " << uv_strerror(nread) << std::endl;
}
uv_close((uv_handle_t*)stream, on_close);
delete[] buf->base;
return;
}
if (nread > 0) {
// 使用公共方法添加并处理数据
ctx->append_and_process_data(buf->base, nread);
std::cout << "on_read: " << ctx->device_info.mac << " get " << nread << " bytes" << std::endl;
}
delete[] buf->base;
}
/* 数据写入回调 */
void on_write(uv_write_t* req, int status) {
ClientContext* ctx = static_cast<ClientContext*>(req->handle->data);
if (status < 0) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] SEND ERROR: " << uv_strerror(status) << std::endl;
}
std::cout << "on_write: " << ctx->device_info.mac << " down!" << std::endl;
delete[] static_cast<char*>(req->data); // 释放发送数据缓冲区
delete req; // 释放写入请求
}
/* 定时发送回调 */
//1秒执行一次定时器
void on_timer(uv_timer_t* handle) {
ClientContext* ctx = static_cast<ClientContext*>(handle->data);
if (ctx->state != ConnectionState::CONNECTED) {
return;
}
// 检查状态超时 30秒状态未更新则重置为空闲状态
ctx->check_state_timeout();
// 装置登录成功后,只在空闲状态处理后续动作
if (ctx->cloudstatus == 1) {
uint64_t now = uv_now(ctx->loop);//当前时间戳
//20秒一次 执行统计数据时间询问
if (ctx->current_state_ == DeviceState::IDLE && now - ctx->last_state_query_time_ >= 20000) {
// 更新统计数据最后查询时间
ctx->last_state_query_time_ = now;
auto sendbuff = generate_statequerytime_message();//组装询问统计数据时间报文
ctx->add_action(DeviceState::READING_STATS_TIME, sendbuff);//将该状态以及待发送报文存入队列
}
//一秒一次 执行实时数据询问 仅执行指定次数
if (ctx->current_state_ == DeviceState::IDLE && now - ctx->real_state_query_time_ >= 1000 && ctx->real_state_count > 0) {
// 更新实时数据执行时间和实时收发计数
ctx->real_state_query_time_ = now;
ctx->real_state_count--;
auto sendbuff = generate_realstat_message(static_cast<unsigned char>(ctx->real_point_id_), static_cast<unsigned char>(0x01), static_cast<unsigned char>(0x01));//组装询问实时数据报文
ctx->add_action(DeviceState::READING_REALSTAT, sendbuff);//将该状态以及待发送报文存入队列
}
//处理后续工作队列的工作 取出一个并执行
if (ctx->current_state_ == DeviceState::IDLE) {
ctx->process_next_action();
}
}
}
/* 发送二进制报文函数 */
void send_binary_data(ClientContext* ctx, const unsigned char* data, size_t data_size) {
if (ctx->state != ConnectionState::CONNECTED) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] Cannot send: not connected" << std::endl;
return;
}
uv_buf_t buf = uv_buf_init(const_cast<char*>(reinterpret_cast<const char*>(data)), data_size);
uv_write_t* write_req = new uv_write_t;
// 复制数据以确保安全
char* data_copy = new char[data_size];
memcpy(data_copy, data, data_size);
write_req->data = data_copy;
int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write);
if (ret < 0) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] uv_write failed: " << uv_strerror(ret) << std::endl;
delete[] data_copy;
delete write_req;
}
}
// 新增函数:在事件循环线程中安全发送数据
void safe_send_binary_data(ClientContext* ctx, std::vector<unsigned char> data) {
uv_work_t* req = new uv_work_t;
req->data = new std::pair<ClientContext*, std::vector<unsigned char>>(ctx, std::move(data));
uv_queue_work(ctx->loop, req, [](uv_work_t* req) {
// 在工作线程中不执行实际工作
}, [](uv_work_t* req, int status) {
// 在事件循环线程中执行实际发送
auto* pair = static_cast<std::pair<ClientContext*, std::vector<unsigned char>>*>(req->data);
ClientContext* ctx = pair->first;
const auto& data = pair->second;
if (ctx->state == ConnectionState::CONNECTED) {
// 实际发送逻辑(原 send_binary_data 的核心部分)
uv_buf_t buf = uv_buf_init(const_cast<char*>(reinterpret_cast<const char*>(data.data())), data.size());
uv_write_t* write_req = new uv_write_t;
write_req->data = new std::vector<unsigned char>(data); // 复制数据
int ret = uv_write(write_req, (uv_stream_t*)&ctx->client, &buf, 1, on_write);
if (ret < 0) {
// 错误处理
delete static_cast<std::vector<unsigned char>*>(write_req->data);
delete write_req;
}
}
delete pair;
delete req;
});
}
/* 连接关闭回调 */
void on_close(uv_handle_t* handle) {
ClientContext* ctx = static_cast<ClientContext*>(handle->data);
ctx->state = ConnectionState::DISCONNECTED;
std::cerr << "[Device " << ctx->device_info.device_id << "] Connection closed" << std::endl;
ctx->stop_timers();
// 清空缓存
ctx->clear_stat_cache();
// 清除浮点数据缓存
ctx->clear_float_cache();
ctx->cloudstatus = 0;
{
std::lock_guard<std::mutex> state_lock(ctx->state_mutex_);
ctx->current_state_ = DeviceState::IDLE; // 直接修改状态
std::queue<StateAction> empty;
std::swap(ctx->action_queue_, empty);
}
if (!ctx->shutdown) {
int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts);
delay = delay > MAX_RECONNECT_DELAY ? MAX_RECONNECT_DELAY : delay;
std::cout << "[Device " << ctx->device_info.device_id
<< "] Reconnecting in " << delay << "ms (attempt "
<< ctx->reconnect_attempts + 1 << ")" << std::endl;
ctx->reconnect_attempts++;
ctx->start_reconnect_timer(delay);
}
}
/* 尝试重连 */
void try_reconnect(uv_timer_t* timer) {
ClientContext* ctx = static_cast<ClientContext*>(timer->data);
if (ctx->state != ConnectionState::DISCONNECTED || ctx->shutdown) {
return;
}
std::cerr << "[Device " << ctx->device_info.device_id << "] Attempting reconnect" << std::endl;
ctx->init_tcp();
ctx->state = ConnectionState::CONNECTING;
struct sockaddr_in addr;
uv_ip4_addr(SERVER_IP, SERVER_PORT, &addr);
uv_connect_t* req = new uv_connect_t;
req->data = ctx;
int ret = uv_tcp_connect(req, &ctx->client, (const struct sockaddr*)&addr, on_connect);
if (ret < 0) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] Connect error: " << uv_strerror(ret) << std::endl;
delete req;
uv_close((uv_handle_t*)&ctx->client, on_close);
}
}
/* 连接建立回调 */
void on_connect(uv_connect_t* req, int status) {
ClientContext* ctx = static_cast<ClientContext*>(req->data);
delete req;
if (status < 0) {
std::cerr << "[Device " << ctx->device_info.device_id
<< "] Connect failed: " << uv_strerror(status) << std::endl;
if (!uv_is_closing((uv_handle_t*)&ctx->client)) {
uv_close((uv_handle_t*)&ctx->client, on_close);
}
return;
}
std::cerr << "[Device " << ctx->device_info.device_id << "] Connected to server" << std::endl;
ctx->state = ConnectionState::CONNECTED;
ctx->reconnect_attempts = 0;
// 新增:初始化各个计时时间戳
ctx->last_state_query_time_ = uv_now(ctx->loop);//初始化统计数据时间戳
ctx->real_state_query_time_ = uv_now(ctx->loop);//初始化实时数据时间戳
ctx->real_state_count = 0;//实时数据收发计数
//客户端连接完毕后,发送装置登陆消息
std::cout << "connected: " << ctx->device_info.mac << " send login msg!" << std::endl;
auto binary_data = generate_frontlogin_message(ctx->device_info.mac);
safe_send_binary_data(ctx, binary_data);
uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read);
ctx->start_timer();
}
/* 初始化所有客户端连接 */
void init_clients(uv_loop_t* loop, const std::vector<DeviceInfo>& devices) {
auto& manager = ClientManager::instance();
manager.set_loop(loop); // 使用公共方法设置事件循环
for (const auto& device : devices) {
manager.add_device(device);
}
}
/* 停止所有客户端 */
void stop_all_clients() {
auto& manager = ClientManager::instance();
manager.stop_all();
}
/* 连接监控回调 */
void monitor_connections(uv_timer_t* handle) {
static int recovery_counter = 0;
if (++recovery_counter >= 5) {
int active_count = 0;
auto& manager = ClientManager::instance();
size_t total_clients = manager.client_count();
// 实际应用中,这里需要实现获取活动连接数的方法
// 简化处理,只显示总连接数
std::cout << "Total connections: " << total_clients << std::endl;
recovery_counter = 0;
}
}
static void close_walk_cb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_close(handle, nullptr);
}
}
/* 启动客户端连接 */
void start_client_connect(const std::vector<DeviceInfo>& devices) {
// 创建全局事件循环
global_loop = uv_default_loop();
// 初始化所有客户端
init_clients(global_loop, devices);
// 启动连接监控
uv_timer_init(global_loop, &monitor_timer);
uv_timer_start(&monitor_timer, monitor_connections, 1000, 1000);
// 运行事件循环
uv_run(global_loop, UV_RUN_DEFAULT);
// 添加资源清理阶段
while (uv_loop_alive(global_loop)) {
uv_run(global_loop, UV_RUN_ONCE);
}
// 安全关闭事件循环
int err = uv_loop_close(global_loop);
if (err) {
std::cerr << "uv_loop_close error: " << uv_strerror(err) << std::endl;
// 强制清理残留句柄(调试用)
uv_walk(global_loop, close_walk_cb, nullptr);
uv_run(global_loop, UV_RUN_NOWAIT);
}
// 清理所有客户端
stop_all_clients();
global_loop = nullptr;
}
// ClientManager 成员函数实现
void ClientManager::add_device(const DeviceInfo& device) {
std::lock_guard<std::mutex> lock(mutex_);
if (!loop_) {
std::cerr << "[Device " << device.device_id << "] Cannot add: event loop not set\n";
return;
}
// 检查是否已存在相同ID的装置
if (clients_.find(device.device_id) != clients_.end()) {
std::cerr << "[Device " << device.device_id << "] Already exists, skip adding\n";
return;
}
// 创建新的客户端上下文
int index = clients_.size();
auto ctx = std::unique_ptr<ClientContext>(new ClientContext(loop_, device, index));
// 添加到管理映射
clients_[device.device_id] = std::move(ctx);
// 启动连接
try_reconnect(&clients_[device.device_id]->reconnect_timer);
std::cout << "[Device " << device.device_id << "] Added successfully\n";
}
void ClientManager::remove_device(const std::string& device_id) {
std::lock_guard<std::mutex> lock(mutex_);
auto it = clients_.find(device_id);
if (it == clients_.end()) {
std::cerr << "[Device " << device_id << "] Not found, cannot remove\n";
return;
}
// 关闭连接并移除
it->second->shutdown = true;
it->second->close_handles();
clients_.erase(it);
std::cout << "[Device " << device_id << "] Removed successfully\n";
}
void ClientManager::stop_all() {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
pair.second->shutdown = true;
pair.second->close_handles();
}
clients_.clear();
}
// 在ClientManager成员函数实现中添加方法实现
void ClientManager::restart_device(const std::string& device_id) {
std::lock_guard<std::mutex> lock(mutex_);
ClientContext* target_ctx = nullptr;
// 查找匹配的设备支持device_id或mac地址
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == device_id ||
ctx->device_info.mac == device_id) {
target_ctx = ctx.get();
break;
}
}
if (!target_ctx) {
std::cerr << "[restart_device] Device not found: " << device_id << std::endl;
return;
}
std::cout << "[restart_device] Restarting device: " << device_id << std::endl;
// 确保不处于关闭状态
target_ctx->shutdown = false;
// 停止所有定时器
target_ctx->stop_timers();
// 重置重连计数器
target_ctx->reconnect_attempts = 0;
// 关闭TCP连接会触发on_close回调
if (!uv_is_closing((uv_handle_t*)&target_ctx->client)) {
uv_close((uv_handle_t*)&target_ctx->client, on_close);
}
else {
// 如果已经在关闭过程中,直接触发重连
target_ctx->state = ConnectionState::DISCONNECTED;
target_ctx->start_reconnect_timer(0); // 立即重连
}
}
//修改客户端云前置登录状态
bool ClientManager::set_cloud_status(const std::string& identifier, int status) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
// 匹配装置ID或MAC地址
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
// 修改云前置登录状态
ctx->cloudstatus = status;
std::cout << "[Device " << identifier
<< "] Cloud status updated to: " << status << std::endl;
return true;
}
}
std::cerr << "[set_cloud_status] Device not found: " << identifier << std::endl;
return false;
}
bool ClientManager::add_action_to_device(const std::string& identifier,
DeviceState state,
const std::vector<unsigned char>& packet) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
ctx->add_action(state, packet);
return true;
}
}
std::cerr << "[add_action_to_device] Device not found: " << identifier << std::endl;
return false;
}
bool ClientManager::change_device_state(const std::string& identifier,
DeviceState new_state,
const std::vector<unsigned char>& packet) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
ctx->change_state(new_state, packet);
return true;
}
}
std::cerr << "[change_device_state] Device not found: " << identifier << std::endl;
return false;
}
bool ClientManager::clear_action_queue(const std::string& identifier) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
std::lock_guard<std::mutex> state_lock(ctx->state_mutex_);
std::queue<StateAction> empty;
std::swap(ctx->action_queue_, empty);
return true;
}
}
std::cerr << "[clear_action_queue] Device not found: " << identifier << std::endl;
return false;
}
bool ClientManager::get_device_state(const std::string& identifier, DeviceState& out_state) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
std::lock_guard<std::mutex> state_lock(ctx->state_mutex_);
out_state = ctx->current_state_;
return true;
}
}
std::cerr << "[get_device_state] Device not found: " << identifier << std::endl;
return false;
}
bool ClientManager::post_message_processing(const std::string& identifier) {
ClientContext* target = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
target = pair.second.get();
break;
}
}
} // 提前释放manager锁
if (!target) {
std::cerr << "Device not found: " << identifier << std::endl;
return false;
}
// 直接操作client避免嵌套锁
if (target->current_state_ == DeviceState::IDLE) {
//空闲状态执行下一项工作
target->process_next_action();
return true;
}
else {
//非空闲状态执行当前工作
target->send_current_packet();
return true;
}
}
//通过id或者mac读取装置下属测点信息
bool ClientManager::get_device_points(const std::string& identifier,
std::vector<PointInfo>& out_points) {
std::lock_guard<std::mutex> lock(mutex_);
for (const auto& pair : clients_) {
const auto& ctx = pair.second;
// 匹配装置ID或MAC地址
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
// 复制测点信息到输出参数
out_points = ctx->device_info.points;
return true;
}
}
std::cerr << "[get_device_points] Device not found: " << identifier << std::endl;
return false;
}
//保存多帧报文至缓存区等待收全
bool ClientManager::add_stat_packet_to_device(const std::string& identifier,
const std::vector<unsigned char>& packet,
int current_packet,
int total_packets) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
return ctx->add_stat_packet(packet, current_packet, total_packets);
}
}
std::cerr << "[add_stat_packet_to_device] Device not found: " << identifier << std::endl;
return false;
}
//获取缓存区内所有多帧报文并清空缓存
std::vector<ClientContext::StatPacket> ClientManager::get_and_clear_stat_packets(const std::string& identifier) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
return ctx->get_and_clear_stat_packets();
}
}
std::cerr << "[get_and_clear_stat_packets] Device not found: " << identifier << std::endl;
return {};
}
//清空所有缓存区
bool ClientManager::clear_stat_cache(const std::string& identifier) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
ctx->clear_stat_cache();
return true;
}
}
std::cerr << "[clear_stat_cache] Device not found: " << identifier << std::endl;
return false;
}
// 获取pt和CT变比
bool ClientManager::get_pt_ct_ratio(const std::string& identifier,
int16_t nCpuNo,
float& pt_ratio,
float& ct_ratio) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
// 匹配装置ID或MAC地址
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
// 遍历装置的所有测点
for (const auto& point : ctx->device_info.points) {
// 匹配测点序号
if (point.nCpuNo == nCpuNo) {
// 计算PT变比 (PT1/PT2)
pt_ratio = (point.PT2 != 0.0) ?
static_cast<float>(point.PT1 / point.PT2) : 1.0f;
// 计算CT变比 (CT1/CT2)
ct_ratio = (point.CT2 != 0.0) ?
static_cast<float>(point.CT1 / point.CT2) : 1.0f;
return true;
}
}
std::cerr << "[get_pt_ct_ratio] Point not found for CPU: "
<< nCpuNo << " in device: " << identifier << std::endl;
return false;
}
}
std::cerr << "[get_pt_ct_ratio] Device not found: " << identifier << std::endl;
return false;
}
// 添加浮点数据到指定设备的缓存
bool ClientManager::add_float_data_to_device(const std::string& identifier,
ushort point_id,
int data_type,
const tagPqData_Float& float_data) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
return ctx->add_float_data(point_id, data_type, float_data);
}
}
return false;
}
// 获取并清除指定测点的完整浮点数据
std::array<tagPqData_Float, 4> ClientManager::get_and_clear_float_data(
const std::string& identifier, ushort point_id) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
return ctx->get_and_clear_float_data(point_id);
}
}
return {};
}
// 清除设备的所有浮点缓存
bool ClientManager::clear_float_cache(const std::string& identifier) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
ctx->clear_float_cache();
return true;
}
}
return false;
}
//实时数据调用
bool ClientManager::set_real_state_count(const std::string& identifier, int count, ushort point_id) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
// 设置实时计数
ctx->real_state_count = count;
// 设置测点序号(如果提供了有效值)
if (point_id != 0) {
ctx->real_point_id_ = point_id;
}
std::cout << "[Device " << identifier
<< "] Real state params set - count: " << count
<< ", point_id: " << ctx->real_point_id_.load()
<< std::endl;
return true;
}
}
std::cerr << "[set_real_state_count] Device not found: " << identifier << std::endl;
return false;
}
//读取文件目录调用
bool ClientManager::add_file_menu_action_to_device(
const std::string& identifier,
const std::string& file_path)
{
std::lock_guard<std::mutex> lock(mutex_);
// 查找匹配的设备
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier)
{
// 生成文件目录读取报文
auto packet = generate_getfilemenu_message(file_path);
// 添加动作到队列 (状态: 读取文件目录)
ctx->add_action(DeviceState::READING_FILEMENU, packet);
// 如果当前空闲则立即执行
if (ctx->current_state_ == DeviceState::IDLE) {
ctx->process_next_action();
}
return true; // 成功添加
}
}
return false; // 设备未找到
}
//文件下载调用
bool ClientManager::add_file_download_action_to_device(
const std::string& identifier,
const std::string& file_path)
{
std::lock_guard<std::mutex> lock(mutex_);
// 查找匹配的设备
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier)
{
// 生成下载请求报文 (帧序号固定为1代表开始新文件的下载)
auto downloadMsg = generate_downloadfile_message(1, file_path);
// 添加动作到队列 (状态: 读取文件目录)
ctx->add_action(DeviceState::READING_FILEDATA, downloadMsg);
// 如果当前空闲则立即执行
if (ctx->current_state_ == DeviceState::IDLE) {
ctx->process_next_action();
}
return true; // 成功添加
}
}
return false; // 设备未找到
}
bool ClientManager::get_point_scale_and_pttype(const std::string& identifier,
ushort nCpuNo,
std::string& out_scale,
int& out_pttype) {
std::lock_guard<std::mutex> lock(mutex_);
// 遍历所有客户端上下文
for (auto& pair : clients_) {
auto& ctx = pair.second;
// 检查标识符是否匹配设备ID或MAC地址
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
// 在装置的测点列表中查找匹配的测点
for (const auto& point : ctx->device_info.points) {
if (point.nCpuNo == nCpuNo) {
out_scale = point.strScale;
out_pttype = point.nPTType;
return true;
}
}
// 测点未找到
std::cerr << "Point with nCpuNo " << nCpuNo
<< " not found for device: " << identifier << std::endl;
return false;
}
}
// 设备未找到
std::cerr << "Device not found: " << identifier << std::endl;
return false;
}
//修改待发送报文的帧序号(仅在暂态文件中使用)
bool ClientManager::update_current_packet_frame(const std::string& identifier, int next_frame) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier || ctx->device_info.mac == identifier) {
std::lock_guard<std::mutex> state_lock(ctx->state_mutex_);
// 检查报文长度是否足够
if (ctx->current_packet_.size() < 16) {
std::cerr << "Packet too short to update frame number" << std::endl;
return false;
}
// 小端序写入新帧序号 (位置12-15字节)
ctx->current_packet_[12] = (next_frame >> 0) & 0xFF;
ctx->current_packet_[13] = (next_frame >> 8) & 0xFF;
ctx->current_packet_[14] = (next_frame >> 16) & 0xFF;
ctx->current_packet_[15] = (next_frame >> 24) & 0xFF;
return true;
}
}
return false;
}
bool ClientManager::parse_download_packet(const std::string& identifier, DownloadInfo& out_info) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
// 获取当前状态报文
std::vector<unsigned char> current_packet;
{
std::lock_guard<std::mutex> state_lock(ctx->state_mutex_);
current_packet = ctx->current_packet_;
}
// 解析帧序号 (12-15字节小端序)
if (current_packet.size() >= 16) {
out_info.current_frame =
(static_cast<int>(current_packet[12])) |
(static_cast<int>(current_packet[13]) << 8) |
(static_cast<int>(current_packet[14]) << 16) |
(static_cast<int>(current_packet[15]) << 24);
// 解析文件名 (从偏移量16开始)
if (current_packet.size() > 16) {
const char* filename_start = reinterpret_cast<const char*>(current_packet.data()) + 16;
size_t filename_len = 128;
out_info.filename.assign(filename_start, filename_len);
return true;
}
}
}
}
return false;
}
bool ClientManager::add_file_packet_to_device(const std::string& identifier,
int frame_index,
const unsigned char* data,
size_t size) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
ctx->add_file_packet(frame_index, data, size);
return true;
}
}
return false;
}
std::vector<std::vector<unsigned char>>
ClientManager::get_and_clear_file_packets(const std::string& identifier) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
return ctx->get_and_clear_file_packets();
}
}
return {};
}
bool ClientManager::update_current_filename(const std::string& identifier,
const std::string& filename) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
ctx->set_current_filename(filename);
return true;
}
}
return false;
}
std::string ClientManager::get_current_filename(const std::string& identifier) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& pair : clients_) {
auto& ctx = pair.second;
if (ctx->device_info.device_id == identifier ||
ctx->device_info.mac == identifier) {
return ctx->get_current_filename();
}
}
return "";
}