471 lines
18 KiB
C++
471 lines
18 KiB
C++
#ifndef CLIENT_H
|
||
#define CLIENT_H
|
||
|
||
#include <uv.h>
|
||
#include <string>
|
||
#include <vector>
|
||
#include <memory>
|
||
#include <unordered_map>
|
||
#include <mutex>
|
||
#include <queue>
|
||
#include "dealMsg.h"
|
||
#include "PQSMsg.h"
|
||
// 测点信息结构
|
||
struct PointInfo {
|
||
std::string point_id; // 测点ID
|
||
std::string name; // 测点名称
|
||
std::string device_id; // 所属装置ID
|
||
ushort nCpuNo; //测点序号 1-6
|
||
double PT1; // 电压变比1
|
||
double PT2; // 电压变比2
|
||
double CT1; // 电流变比1
|
||
double CT2; // 电流变比2
|
||
std::string strScale; // 电压等级
|
||
int nPTType; // 接线方式 0-星型 1-角型
|
||
};
|
||
|
||
// 装置信息结构
|
||
struct DeviceInfo {
|
||
std::string device_id; // 装置ID
|
||
std::string name; // 装置名称
|
||
std::string model; // 装置型号
|
||
std::string mac; // 装置MAC地址
|
||
int status; // 运行状态 (0: 离线, 1: 在线)
|
||
std::vector<PointInfo> points; // 下属测点
|
||
bool righttime; //对时启动标志
|
||
};
|
||
|
||
enum class ConnectionState {
|
||
DISCONNECTED,
|
||
CONNECTING,
|
||
CONNECTED
|
||
};
|
||
|
||
// 添加的状态枚举
|
||
enum class DeviceState {
|
||
IDLE, // 空闲状态
|
||
READING_STATS, // 读取统计数据
|
||
READING_STATS_TIME, // 读取统计时间
|
||
READING_REALSTAT, // 读取实时数据
|
||
READING_EVENTFILE, // 暂态波形文件下载
|
||
READING_FILEMENU, // 读取文件目录
|
||
READING_FILEDATA, // 下载文件数据
|
||
READING_FIXEDVALUE, // 读取测点定值
|
||
READING_FIXEDVALUEDES, // 读取测点定值描述
|
||
SET_FIXEDVALUE, // 设置测点定值
|
||
READING_INTERFIXEDVALUE, // 读取内部定值
|
||
READING_INTERFIXEDVALUEDES, // 读取内部定值描述
|
||
READING_CONTROLWORD, // 读取控制字描述
|
||
SET_INTERFIXEDVALUE, // 设置内部定值
|
||
READING_RUNNINGINFORMATION_1,// 读取装置运行信息(主动触发)
|
||
READING_RUNNINGINFORMATION_2,// 读取装置运行信息(定时执行)
|
||
READING_DEVVERSION, // 读取装置版本配置信息
|
||
SET_RIGHTTIME, // 设置装置对时
|
||
READING_EVENTLOG, // 补招事件日志
|
||
// 可根据需要添加更多状态
|
||
CUSTOM_ACTION // 自定义动作
|
||
};
|
||
|
||
// 状态动作结构体
|
||
struct StateAction {
|
||
DeviceState state;
|
||
std::vector<unsigned char> packet; // 该状态需要发送的报文
|
||
};
|
||
|
||
class ClientContext {
|
||
public:
|
||
uv_loop_t* loop;
|
||
uv_tcp_t client;
|
||
uv_timer_t timer;
|
||
uv_timer_t reconnect_timer;
|
||
ConnectionState state;
|
||
int reconnect_attempts;
|
||
volatile bool shutdown;
|
||
uint64_t last_state_query_time_ = 0; // 统计数据计时时间戳
|
||
uint64_t real_state_query_time_ = 0; // 实时数据计时时间戳
|
||
std::atomic<int> real_state_count{ 0 };//实时数据收发计数 原子操作保证线程安全
|
||
std::atomic<ushort> real_point_id_{ 1 }; // 新增:实时数据读取的测点序号(原子操作)
|
||
uint64_t read_runninginformationMsg = 0; // 装置定时读取运行信息时间戳
|
||
uint64_t right_time = 0; // 装置定时对时时间戳
|
||
|
||
DeviceInfo device_info; // 装置信息
|
||
int cloudstatus = 0; // 云前置登录状态(0:未登录 1:已登录)初始化默认为0
|
||
uint64_t get_cloudmessage_time = 0;//获取最新装置通讯报文时标
|
||
uint64_t login_cloud_time = 0;//装置登录报文超时发送时标
|
||
std::string dev_CloudProtocolVer = "V1.0";//装置云协议版本号 例:V1.5(装置登录后获取一次,当前用于判断对时版本),不存在则默认为V1.0
|
||
|
||
// 新增状态管理成员
|
||
DeviceState current_state_; // 当前装置状态
|
||
uint64_t state_start_time_; // 状态开始时间(ms)
|
||
std::queue<StateAction> action_queue_; // 状态动作队列
|
||
std::mutex state_mutex_; // 状态操作互斥锁
|
||
std::vector<unsigned char> current_packet_; // 当前状态需要发送的报文
|
||
|
||
ClientContext(uv_loop_t* loop, const DeviceInfo& device, int index);
|
||
~ClientContext();
|
||
|
||
void init_tcp();//初始化客户端连接
|
||
void start_timer();//启动对应装置计时器 5秒执行一次
|
||
void start_reconnect_timer(int delay);//启动客户端重连定时
|
||
void stop_timers();//停止重连定时器
|
||
void close_handles();//关闭客户端各类连接与定时器
|
||
void append_and_process_data(const char* data, size_t len);//接收装置数据并存入缓冲
|
||
void put_packet_into_queue(const std::vector<unsigned char>& packet);//推送完整数据至处理队列
|
||
|
||
void change_state(DeviceState new_state, const std::vector<unsigned char>& packet = {});//改变装置状态和当前状态的待发送报文
|
||
void add_action(DeviceState state, const std::vector<unsigned char>& packet = {});//添加后续动作
|
||
void check_state_timeout();//装置状态超时检测
|
||
void process_next_action();//装置取后续工作并执行
|
||
void send_current_packet();//发送当前状态的报文至装置
|
||
|
||
// 新增: 多帧数据报文缓存
|
||
struct StatPacket {
|
||
int packet_index;
|
||
std::vector<unsigned char> data;
|
||
};
|
||
|
||
// 新增缓存管理方法------------------------------------------------------------------------
|
||
std::vector<StatPacket> stat_packets_cache_; // 缓存分帧报文
|
||
int expected_total_packets_ = 0; // 预期总帧数
|
||
std::mutex stat_cache_mutex_; // 缓存互斥锁
|
||
bool add_stat_packet(const std::vector<unsigned char>& packet, int current_packet, int total_packets);//插入多帧缓存数据
|
||
std::vector<StatPacket> get_and_clear_stat_packets();//取出所有缓存数据并清空缓存
|
||
void clear_stat_cache();//清空缓存
|
||
// 新增事件日志缓存管理方法-----------------------------------------------------------------
|
||
ushort event_lineNo = 1; // 事件日志待补招测点
|
||
std::vector<StatPacket> event_stat_packets_cache_; // 事件日志缓存分帧报文
|
||
int event_expected_total_packets_ = 0; // 事件日志预期总帧数
|
||
std::mutex event_stat_cache_mutex_; // 事件日志缓存互斥锁
|
||
bool event_add_stat_packet(const std::vector<unsigned char>& packet, int current_packet, int total_packets);//事件日志插入多帧缓存数据
|
||
std::vector<StatPacket> event_get_and_clear_stat_packets();//事件日志取出所有缓存数据并清空缓存
|
||
void event_clear_stat_cache();//事件日志清空缓存
|
||
|
||
// 统计数据缓存
|
||
struct PointFloatCache {
|
||
std::array<tagPqData_Float, 4> data; // 存储四种数据类型(0-3)
|
||
std::array<bool, 4> received = { false }; // 标记四种数据类型是否已接收
|
||
};
|
||
|
||
// 测点统计浮点数据缓存映射表 (测点号 -> 缓存数据)
|
||
std::unordered_map<ushort, PointFloatCache> point_float_cache_;
|
||
std::mutex float_cache_mutex_; // 浮点缓存互斥锁
|
||
|
||
// 添加浮点数据到缓存
|
||
bool add_float_data(ushort point_id, int data_type, const tagPqData_Float& float_data);
|
||
// 获取并清除指定测点的完整浮点数据
|
||
std::array<tagPqData_Float, 4> get_and_clear_float_data(ushort point_id);
|
||
// 清除所有浮点数据缓存
|
||
void clear_float_cache();
|
||
|
||
// 实时数据包缓存
|
||
struct RealtimePacket {
|
||
unsigned char packet_type;
|
||
std::vector<unsigned char> data;
|
||
};
|
||
|
||
std::vector<RealtimePacket> realtime_packets_cache_; // 缓存实时数据包
|
||
std::mutex realtime_cache_mutex_; // 缓存互斥锁
|
||
|
||
// 添加实时数据包到缓存
|
||
void add_realtime_packet(unsigned char packet_type,
|
||
const unsigned char* data,
|
||
size_t size) {
|
||
std::lock_guard<std::mutex> lock(realtime_cache_mutex_);
|
||
realtime_packets_cache_.push_back({
|
||
packet_type,
|
||
std::vector<unsigned char>(data, data + size)
|
||
});
|
||
}
|
||
|
||
// 获取并清空实时数据缓存
|
||
std::vector<RealtimePacket> get_and_clear_realtime_packets() {
|
||
std::lock_guard<std::mutex> lock(realtime_cache_mutex_);
|
||
auto packets = std::move(realtime_packets_cache_);
|
||
realtime_packets_cache_.clear();
|
||
return packets;
|
||
}
|
||
|
||
// 重置实时数据(包括缓存)
|
||
void reset_realtime_data() {
|
||
std::lock_guard<std::mutex> lock(realtime_cache_mutex_);
|
||
realtime_packets_cache_.clear();
|
||
}
|
||
|
||
//暂态文件下载相关
|
||
struct FileDownloadPacket {
|
||
int frame_index;
|
||
std::vector<unsigned char> data;
|
||
};
|
||
|
||
std::vector<FileDownloadPacket> file_download_cache_; // 文件分片缓存
|
||
std::string current_filename_; // 当前下载的文件名
|
||
std::mutex file_cache_mutex_; // 缓存互斥锁
|
||
|
||
// 添加文件分片到缓存
|
||
void add_file_packet(int frame_index, const unsigned char* data, size_t size) {
|
||
std::lock_guard<std::mutex> lock(file_cache_mutex_);
|
||
file_download_cache_.push_back({ frame_index, std::vector<unsigned char>(data, data + size) });
|
||
}
|
||
|
||
// 获取并清空缓存中的所有分片(按帧序号排序)
|
||
std::vector<std::vector<unsigned char>> get_and_clear_file_packets() {
|
||
std::lock_guard<std::mutex> lock(file_cache_mutex_);
|
||
|
||
// 按帧序号排序
|
||
std::sort(file_download_cache_.begin(), file_download_cache_.end(),
|
||
[](const FileDownloadPacket& a, const FileDownloadPacket& b) {
|
||
return a.frame_index < b.frame_index;
|
||
});
|
||
|
||
// 提取数据
|
||
std::vector<std::vector<unsigned char>> packets;
|
||
for (const auto& pkt : file_download_cache_) {
|
||
packets.push_back(pkt.data);
|
||
}
|
||
|
||
// 清空缓存
|
||
file_download_cache_.clear();
|
||
return packets;
|
||
}
|
||
|
||
// 清空文件缓存
|
||
void clear_file_cache() {
|
||
std::lock_guard<std::mutex> lock(file_cache_mutex_);
|
||
file_download_cache_.clear();
|
||
}
|
||
|
||
// 设置当前下载的文件名
|
||
void set_current_filename(const std::string& filename) {
|
||
std::lock_guard<std::mutex> lock(file_cache_mutex_);
|
||
current_filename_ = filename;
|
||
}
|
||
|
||
// 获取当前下载的文件名
|
||
std::string get_current_filename() {
|
||
std::lock_guard<std::mutex> lock(file_cache_mutex_);
|
||
return current_filename_;
|
||
}
|
||
private:
|
||
int index_;
|
||
|
||
private:
|
||
std::vector<unsigned char> recv_buffer_; // 接收数据缓冲区
|
||
std::mutex buffer_mutex_; // 缓冲区互斥锁
|
||
void process_buffer();
|
||
};
|
||
|
||
class ClientManager {
|
||
public:
|
||
static ClientManager& instance() {
|
||
static ClientManager inst;
|
||
return inst;
|
||
}
|
||
|
||
// 设置事件循环
|
||
void set_loop(uv_loop_t* loop) {
|
||
std::lock_guard<std::mutex> lock(mutex_);
|
||
loop_ = loop;
|
||
}
|
||
|
||
void add_device(const DeviceInfo& device);//添加一个装置连接
|
||
void remove_device(const std::string& device_id);//删除一个装置连接
|
||
void restart_device(const std::string& device_id);//关闭指定装置连接,等待重连唤起
|
||
void stop_all();//停止所有客户端连接
|
||
bool set_cloud_status(const std::string& identifier, int status);//修改云前置登录状态
|
||
bool post_message_processing(const std::string& identifier);// 消息处理完成后触发状态处理
|
||
// 添加状态动作到装置
|
||
bool add_action_to_device(const std::string& identifier,
|
||
DeviceState state,
|
||
const std::vector<unsigned char>& packet = {});
|
||
|
||
// 改变装置当前状态
|
||
bool change_device_state(const std::string& identifier,
|
||
DeviceState new_state,
|
||
const std::vector<unsigned char>& packet = {});
|
||
|
||
// 清除装置动作队列
|
||
bool clear_action_queue(const std::string& identifier);
|
||
|
||
// 获取装置当前状态
|
||
bool get_device_state(const std::string& identifier, DeviceState& out_state);
|
||
|
||
// 新增:通过标识符获取装置测点信息
|
||
bool get_device_points(const std::string& identifier,std::vector<PointInfo>& out_points);
|
||
|
||
//接收指定客户端的多帧报文并存入缓存区
|
||
bool add_stat_packet_to_device(const std::string& identifier,
|
||
const std::vector<unsigned char>& packet,
|
||
int current_packet,
|
||
int total_packets);
|
||
|
||
//获取指定客户端的所有缓存报文并清空缓存区
|
||
std::vector<ClientContext::StatPacket> get_and_clear_stat_packets(const std::string& identifier);
|
||
|
||
//清空多帧报文保存缓存区
|
||
bool clear_stat_cache(const std::string& identifier);
|
||
|
||
//保存多帧事件日志报文至缓存区等待收全
|
||
bool add_eventlog_packet_to_device(const std::string& identifier,
|
||
const std::vector<unsigned char>& packet,
|
||
int current_packet,
|
||
int total_packets);
|
||
|
||
//获取缓存区内所有多帧事件日志报文并清空缓存
|
||
std::vector<ClientContext::StatPacket> get_and_clear_event_packets(const std::string& identifier);
|
||
|
||
//清空所有事件日志缓存区
|
||
bool clear_event_cache(const std::string& identifier);
|
||
|
||
// 获取指定测点的PT和CT变比值
|
||
bool get_pt_ct_ratio(const std::string& identifier,
|
||
int16_t nCpuNo,
|
||
float& pt_ratio,
|
||
float& ct_ratio);
|
||
|
||
// 获取待补招测点序号
|
||
bool get_event_lineid(const std::string& identifier, int& nCpuNo);
|
||
|
||
// 获取客户端数量
|
||
size_t client_count() {
|
||
std::lock_guard<std::mutex> lock(mutex_);
|
||
return clients_.size();
|
||
}
|
||
|
||
// 添加浮点数据到指定设备的缓存
|
||
bool add_float_data_to_device(const std::string& identifier,
|
||
ushort point_id,
|
||
int data_type,
|
||
const tagPqData_Float& float_data);
|
||
|
||
// 获取并清除指定测点的完整浮点数据
|
||
std::array<tagPqData_Float, 4> get_and_clear_float_data(
|
||
const std::string& identifier, ushort point_id);
|
||
|
||
// 清除设备的所有浮点缓存
|
||
bool clear_float_cache(const std::string& identifier);
|
||
|
||
// 添加实时数据包到设备缓存
|
||
bool add_realtime_packet_to_device(const std::string& identifier,
|
||
unsigned char packet_type,
|
||
const unsigned char* data,
|
||
size_t size) {
|
||
std::lock_guard<std::mutex> lock(mutex_);
|
||
for (auto& pair : clients_) {
|
||
auto& ctx = pair.second;
|
||
if (ctx->device_info.device_id == identifier ||
|
||
ctx->device_info.mac == identifier) {
|
||
ctx->add_realtime_packet(packet_type, data, size);
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
}
|
||
|
||
// 获取并清空实时数据缓存
|
||
std::vector<ClientContext::RealtimePacket>
|
||
get_and_clear_realtime_packets(const std::string& identifier) {
|
||
std::lock_guard<std::mutex> lock(mutex_);
|
||
for (auto& pair : clients_) {
|
||
auto& ctx = pair.second;
|
||
if (ctx->device_info.device_id == identifier ||
|
||
ctx->device_info.mac == identifier) {
|
||
return ctx->get_and_clear_realtime_packets();
|
||
}
|
||
}
|
||
return {};
|
||
}
|
||
|
||
// 获取指定测点的电压等级和接线方式
|
||
bool get_point_scale_and_pttype(const std::string& identifier,
|
||
ushort nCpuNo,
|
||
std::string& out_scale,
|
||
int& out_pttype);
|
||
|
||
//修改暂态相关文件报文的帧序号
|
||
bool update_current_packet_frame(const std::string& identifier, int next_frame);
|
||
|
||
//获取暂态文件下载报文中的信息
|
||
struct DownloadInfo {
|
||
std::string filename;
|
||
int current_frame;
|
||
};
|
||
bool parse_download_packet(const std::string& identifier, DownloadInfo& out_info);
|
||
|
||
bool add_file_packet_to_device(const std::string& identifier,
|
||
int frame_index,
|
||
const unsigned char* data,
|
||
size_t size);
|
||
|
||
std::vector<std::vector<unsigned char>> get_and_clear_file_packets(const std::string& identifier);
|
||
|
||
bool update_current_filename(const std::string& identifier, const std::string& filename);
|
||
|
||
std::string get_current_filename(const std::string& identifier);
|
||
|
||
//刷新客户端装置最新接收通讯报文时间
|
||
bool set_cloudmessage_time(const std::string& identifier);
|
||
|
||
//读取并修改客户端的装置运维信息以备后续各类功能使用
|
||
bool set_versioninformation(const std::string& identifier, string cloud_version);
|
||
|
||
//读取文件目录调用 传入mac/id + 文件路径
|
||
bool add_file_menu_action_to_device(const std::string& identifier, const std::string& file_path);
|
||
|
||
// 新增:设置实时数据收发计数 实时数据调用 传入mac/id + 实时次数 + 测点序号
|
||
bool set_real_state_count(const std::string& identifier, int count, ushort point_id);
|
||
|
||
//文件下载调用 传入mac/id + 文件位置
|
||
bool add_file_download_action_to_device(const std::string& identifier, const std::string& file_path);
|
||
|
||
//获取指定装置指定测点下的定值数据 传入mac/id + 测点序号
|
||
bool get_fixedvalue_action_to_device(const std::string& identifier, ushort point_id);
|
||
|
||
//获取指定装置定值描述 传入mac/id
|
||
bool get_fixedvaluedes_action_to_device(const std::string& identifier);
|
||
|
||
//设置指定测点的定值配置 传入mac/id + 测点序号 + 定时队列
|
||
bool set_fixedvalue_action_to_device(const std::string& identifier, ushort point_id, const std::vector<float>& value);
|
||
|
||
//获取装置内部定值 传入mac/id
|
||
bool get_interfixedvalue_action_to_device(const std::string& identifier);
|
||
|
||
//获取装置内部定值控制字描述or内部定值描述 1-内部定值描述,2-控制字位描述
|
||
bool get_fixedvalucontrolword_action_to_device(const std::string& identifier, unsigned char nDesCW);
|
||
|
||
//设置装置内部定值 传入mac/id + 内部定值序列
|
||
bool set_interfixedvalue_action_to_device(const std::string& identifier, const std::vector<uint16_t>& values);
|
||
|
||
//读取装置运行信息
|
||
bool read_runninginformation_action_to_device(const std::string& identifier);
|
||
|
||
//读取装置版本配置信息
|
||
bool read_devversion_action_to_device(const std::string& identifier);
|
||
|
||
/**
|
||
* @brief 补招事件日志动作
|
||
* @param Time1 起始时间
|
||
* @param Time2 结束时间
|
||
* @param eventType 事件类型,默认2-暂态事件 4-告警时间
|
||
* @param monitorPoint 监测点,默认1-监测点1 1-6 对应测点
|
||
* @return 调用成功或失败的结果
|
||
*/
|
||
bool read_eventlog_action_to_device(const std::string& identifier, const std::tm& Time1, const std::tm& Time2,uint8_t eventType = 2, uint8_t monitorPoint = 1);
|
||
private:
|
||
ClientManager() : loop_(nullptr) {}
|
||
std::unordered_map<std::string, std::unique_ptr<ClientContext>> clients_;
|
||
std::mutex mutex_;
|
||
uv_loop_t* loop_; // 事件循环指针
|
||
};
|
||
|
||
// 函数声明
|
||
void start_client_connect(const std::vector<DeviceInfo>& devices);
|
||
void send_binary_data(ClientContext* ctx, const unsigned char* data, size_t data_size);
|
||
void safe_send_binary_data(ClientContext* ctx, std::vector<unsigned char> data);
|
||
void on_timer(uv_timer_t* handle);
|
||
void try_reconnect(uv_timer_t* timer);
|
||
void on_connect(uv_connect_t* req, int status);
|
||
void on_close(uv_handle_t* handle);
|
||
void init_clients(uv_loop_t* loop, const std::vector<DeviceInfo>& devices);
|
||
void stop_all_clients();
|
||
|
||
#endif |