diff --git a/LFtid1056/PQSMsg.h b/LFtid1056/PQSMsg.h index 7bb96bb..6419cfc 100644 --- a/LFtid1056/PQSMsg.h +++ b/LFtid1056/PQSMsg.h @@ -792,10 +792,419 @@ private: } }; -//计算报文帧长度 1帧1024为1K +//计算报文帧长度 1帧1024为1K 统计数据相关 constexpr int PqDataLen = tagPqData::GetSize(); constexpr int Stat_PacketNum = (PqDataLen / 1024 > 0) ? (PqDataLen / 1024 + 1) : (PqDataLen / 1024); +//实时数据结构(1字节对齐) +#pragma pack(push, 1) +class RealtagPqDate_float { +public: + tagTime time; // 时间 + + //实时数据各浮点数组 + std::array Rms; //相电压、线电压、电流有效值 + std::array UU_Deviation; //相电压、线电压上偏差 + std::array UL_Deviation; //相电压、线电压下偏差 + std::array THD; //电压电流畸变率 + std::array FREQ; //频率及频率偏差 + std::array, 2> UI_Seq;//电压电流序分量及不平衡度 + std::array, 4> TOTAL_POWER;//分相及总功率P、Q、S + std::array COS_PF; //视在功率因数 + std::array COS_DF; //位移功率因数 + //----------- 报文一包含时间和以上数据 + + std::array, 3> HARMV;//谐波电压含有率 + //----------- 报文二包含时间和以上数据 + + std::array, 3> HARMI;//谐波电流幅值 + //----------- 报文三包含时间和以上数据 + + std::array, 3> HARMVP;//谐波电压相位 + //----------- 报文四包含时间和以上数据 + + std::array, 3> HARMIP;//谐波电流相位 + //----------- 报文五包含时间和以上数据 + + std::array, 3> INHARMV;//间谐波电压幅值 + //----------- 报文六包含时间和以上数据 + + // 构造函数 - 初始化所有数据 + RealtagPqDate_float() { + // 初始化时间 + time = tagTime(); // 调用tagTime的默认构造函数 + + // 初始化基本数组 + Rms.fill(0.0f); + UU_Deviation.fill(0.0f); + UL_Deviation.fill(0.0f); + THD.fill(0.0f); + FREQ.fill(0.0f); + COS_PF.fill(0.0f); + COS_DF.fill(0.0f); + + // 初始化二维数组 + for (auto& arr : UI_Seq) { + arr.fill(0.0f); + } + for (auto& arr : TOTAL_POWER) { + arr.fill(0.0f); + } + + // 初始化谐波相关数组 + for (auto& arr : HARMV) { + arr.fill(0.0f); + } + for (auto& arr : HARMI) { + arr.fill(0.0f); + } + for (auto& arr : HARMVP) { + arr.fill(0.0f); + } + for (auto& arr : HARMIP) { + arr.fill(0.0f); + } + for (auto& arr : INHARMV) { + arr.fill(0.0f); + } + } + + // 辅助函数:从网络字节序读取float + float read_net_float(const uint8_t* ptr) { + uint32_t temp; + memcpy(&temp, ptr, sizeof(uint32_t)); + temp = ntohl(temp); + float result; + memcpy(&result, &temp, sizeof(float)); + return result; + } + + // 实时数据结构的分包解析方法 + bool ParsePacket1(const uint8_t* data, size_t size) { + // 最小长度 = 时间(12字节) + 后续数据(59个float * 4 = 236字节) = 248字节 + const size_t min_size = tagTime::GetSize() + 59 * sizeof(float); + if (size < min_size) { + return false; + } + + // 解析时间 + if (!time.SetStructBuf(data, size)) { + return false; + } + const uint8_t* ptr = data + tagTime::GetSize(); + + // 解析Rms (9个float) + for (int i = 0; i < 9; ++i) { + Rms[i] = read_net_float(ptr); + ptr += sizeof(float); + } + + // 解析UU_Deviation (6个float) + for (int i = 0; i < 6; ++i) { + UU_Deviation[i] = read_net_float(ptr); + ptr += sizeof(float); + } + + // 解析UL_Deviation (6个float) + for (int i = 0; i < 6; ++i) { + UL_Deviation[i] = read_net_float(ptr); + ptr += sizeof(float); + } + + // 解析THD (6个float) + for (int i = 0; i < 6; ++i) { + THD[i] = read_net_float(ptr); + ptr += sizeof(float); + } + + // 解析FREQ (2个float) + for (int i = 0; i < 2; ++i) { + FREQ[i] = read_net_float(ptr); + ptr += sizeof(float); + } + + // 解析UI_Seq (2x5个float) + for (int i = 0; i < 2; ++i) { + for (int j = 0; j < 5; ++j) { + UI_Seq[i][j] = read_net_float(ptr); + ptr += sizeof(float); + } + } + + // 解析TOTAL_POWER (4x3个float) + for (int i = 0; i < 4; ++i) { + for (int j = 0; j < 3; ++j) { + TOTAL_POWER[i][j] = read_net_float(ptr); + ptr += sizeof(float); + } + } + + // 解析COS_PF (4个float) + for (int i = 0; i < 4; ++i) { + COS_PF[i] = read_net_float(ptr); + ptr += sizeof(float); + } + + // 解析COS_DF (4个float) + for (int i = 0; i < 4; ++i) { + COS_DF[i] = read_net_float(ptr); + ptr += sizeof(float); + } + + return true; + } + + bool ParsePacket2(const uint8_t* data, size_t size) { + // 最小长度 = 时间(12字节) + 谐波电压(150个float * 4 = 600字节) = 612字节 + const size_t min_size = tagTime::GetSize() + 3 * HARMNUM * sizeof(float); + if (size < min_size) { + return false; + } + + // 解析时间(覆盖之前的时间) + if (!time.SetStructBuf(data, size)) { + return false; + } + const uint8_t* ptr = data + tagTime::GetSize(); + + // 解析HARMV (3xHARMNUM个float) + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < HARMNUM; ++j) { + HARMV[i][j] = read_net_float(ptr); + ptr += sizeof(float); + } + } + + return true; + } + + bool ParsePacket3(const uint8_t* data, size_t size) { + // 最小长度 = 时间(12字节) + 谐波电流(150个float * 4 = 600字节) = 612字节 + const size_t min_size = tagTime::GetSize() + 3 * HARMNUM * sizeof(float); + if (size < min_size) { + return false; + } + + if (!time.SetStructBuf(data, size)) { + return false; + } + const uint8_t* ptr = data + tagTime::GetSize(); + + // 解析HARMI (3xHARMNUM个float) + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < HARMNUM; ++j) { + HARMI[i][j] = read_net_float(ptr); + ptr += sizeof(float); + } + } + + return true; + } + + bool ParsePacket4(const uint8_t* data, size_t size) { + // 最小长度 = 时间(12字节) + 谐波电压相位(150个float * 4 = 600字节) = 612字节 + const size_t min_size = tagTime::GetSize() + 3 * HARMNUM * sizeof(float); + if (size < min_size) { + return false; + } + + if (!time.SetStructBuf(data, size)) { + return false; + } + const uint8_t* ptr = data + tagTime::GetSize(); + + // 解析HARMVP (3xHARMNUM个float) + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < HARMNUM; ++j) { + HARMVP[i][j] = read_net_float(ptr); + ptr += sizeof(float); + } + } + + return true; + } + + bool ParsePacket5(const uint8_t* data, size_t size) { + // 最小长度 = 时间(12字节) + 谐波电流相位(150个float * 4 = 600字节) = 612字节 + const size_t min_size = tagTime::GetSize() + 3 * HARMNUM * sizeof(float); + if (size < min_size) { + return false; + } + + if (!time.SetStructBuf(data, size)) { + return false; + } + const uint8_t* ptr = data + tagTime::GetSize(); + + // 解析HARMIP (3xHARMNUM个float) + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < HARMNUM; ++j) { + HARMIP[i][j] = read_net_float(ptr); + ptr += sizeof(float); + } + } + + return true; + } + + bool ParsePacket6(const uint8_t* data, size_t size) { + // 最小长度 = 时间(12字节) + 间谐波电压(150个float * 4 = 600字节) = 612字节 + const size_t min_size = tagTime::GetSize() + 3 * HARMNUM * sizeof(float); + if (size < min_size) { + return false; + } + + if (!time.SetStructBuf(data, size)) { + return false; + } + const uint8_t* ptr = data + tagTime::GetSize(); + + // 解析INHARMV (3xHARMNUM个float) + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < HARMNUM; ++j) { + INHARMV[i][j] = read_net_float(ptr); + ptr += sizeof(float); + } + } + + return true; + } + + // 计算浮点字段总数 + size_t CalculateFloatCount() const { + size_t count = 0; + + // 基本数组 + count += Rms.size(); + count += UU_Deviation.size(); + count += UL_Deviation.size(); + count += THD.size(); + count += FREQ.size(); + count += COS_PF.size(); + count += COS_DF.size(); + + // 二维数组 + for (const auto& arr : UI_Seq) count += arr.size(); + for (const auto& arr : TOTAL_POWER) count += arr.size(); + for (const auto& arr : HARMV) count += arr.size(); + for (const auto& arr : HARMI) count += arr.size(); + for (const auto& arr : HARMVP) count += arr.size(); + for (const auto& arr : HARMIP) count += arr.size(); + for (const auto& arr : INHARMV) count += arr.size(); + + return count; + } + + // 序列化浮点数据到缓冲区 + void SerializeFloats(std::vector& buffer) const { + // 基本数组 + for (float val : Rms) buffer.push_back(val); + for (float val : UU_Deviation) buffer.push_back(val); + for (float val : UL_Deviation) buffer.push_back(val); + for (float val : THD) buffer.push_back(val); + for (float val : FREQ) buffer.push_back(val); + + // 二维数组(电压电流序) + for (const auto& arr : UI_Seq) { + for (float val : arr) buffer.push_back(val); + } + + // 功率数组 + for (const auto& arr : TOTAL_POWER) { + for (float val : arr) buffer.push_back(val); + } + + // 功率因数 + for (float val : COS_PF) buffer.push_back(val); + for (float val : COS_DF) buffer.push_back(val); + + // 谐波相关数组 + for (const auto& arr : HARMV) { + for (float val : arr) buffer.push_back(val); + } + for (const auto& arr : HARMI) { + for (float val : arr) buffer.push_back(val); + } + for (const auto& arr : HARMVP) { + for (float val : arr) buffer.push_back(val); + } + for (const auto& arr : HARMIP) { + for (float val : arr) buffer.push_back(val); + } + for (const auto& arr : INHARMV) { + for (float val : arr) buffer.push_back(val); + } + } + + // Base64编码函数(与tagPqData_Float相同) + static std::string base64_encode(const unsigned char* bytes_to_encode, size_t in_len) { + static const char base64_chars[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + + std::string ret; + int i = 0; + int j = 0; + unsigned char char_array_3[3]; + unsigned char char_array_4[4]; + + while (in_len--) { + char_array_3[i++] = *(bytes_to_encode++); + if (i == 3) { + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for (i = 0; i < 4; i++) + ret += base64_chars[char_array_4[i]]; + i = 0; + } + } + + if (i) { + for (j = i; j < 3; j++) + char_array_3[j] = '\0'; + + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for (j = 0; j < i + 1; j++) + ret += base64_chars[char_array_4[j]]; + + while (i++ < 3) + ret += '='; + } + + return ret; + } + + // 新增Base64转换方法 + std::string ConvertToBase64() const { + // 1. 计算总浮点数 + const size_t total_floats = CalculateFloatCount(); + + // 2. 创建缓冲区并填充数据 + std::vector float_buffer; + float_buffer.reserve(total_floats); + SerializeFloats(float_buffer); + + // 3. 转换为字节数据并编码 + const size_t byte_size = float_buffer.size() * sizeof(float); + const unsigned char* byte_data = + reinterpret_cast(float_buffer.data()); + + return base64_encode(byte_data, byte_size); + } +}; +#pragma pack(pop) // 生成带协议头的二进制报文 std::vector generate_binary_message( uint16_t msg_type, diff --git a/LFtid1056/client2.cpp b/LFtid1056/client2.cpp index b91ddc2..d4211ff 100644 --- a/LFtid1056/client2.cpp +++ b/LFtid1056/client2.cpp @@ -382,8 +382,8 @@ void on_timer(uv_timer_t* handle) { ctx->real_state_query_time_ = now; ctx->real_state_count--; - //auto sendbuff = generate_realstat_message(static_cast(ctx->real_point_id_), static_cast(0x01), static_cast(0x01));//缁勮璇㈤棶瀹炴椂鏁版嵁鎶ユ枃 - //ctx->add_action(DeviceState::READING_REALSTAT, sendbuff);//灏嗚鐘舵佷互鍙婂緟鍙戦佹姤鏂囧瓨鍏ラ槦鍒 + auto sendbuff = generate_realstat_message(static_cast(ctx->real_point_id_), static_cast(0x01), static_cast(0x01));//缁勮璇㈤棶瀹炴椂鏁版嵁鎶ユ枃 + ctx->add_action(DeviceState::READING_REALSTAT, sendbuff);//灏嗚鐘舵佷互鍙婂緟鍙戦佹姤鏂囧瓨鍏ラ槦鍒 } //澶勭悊鍚庣画宸ヤ綔闃熷垪鐨勫伐浣 鍙栧嚭涓涓苟鎵ц if (ctx->current_state_ == DeviceState::IDLE) { @@ -985,7 +985,7 @@ bool ClientManager::clear_float_cache(const std::string& identifier) { return false; } -bool ClientManager::set_real_state_count(const std::string& identifier, int count, ushort point_id = 1) { +bool ClientManager::set_real_state_count(const std::string& identifier, int count, ushort point_id) { std::lock_guard lock(mutex_); for (auto& pair : clients_) { diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index 415fa54..acc9bb2 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -67,7 +67,7 @@ public: uint64_t real_state_query_time_ = 0; // 实时数据计时时间戳 std::atomic real_state_count{ 0 };//实时数据收发计数 原子操作保证线程安全 std::atomic real_point_id_{ 1 }; // 新增:实时数据读取的测点序号(原子操作) - + DeviceInfo device_info; // 装置信息 int cloudstatus; // 云前置登录状态(0:未登录 1:已登录) @@ -127,6 +127,40 @@ public: // 清除所有浮点数据缓存 void clear_float_cache(); + // 实时数据包缓存 + struct RealtimePacket { + unsigned char packet_type; + std::vector data; + }; + + std::vector realtime_packets_cache_; // 缓存实时数据包 + std::mutex realtime_cache_mutex_; // 缓存互斥锁 + + // 添加实时数据包到缓存 + void add_realtime_packet(unsigned char packet_type, + const unsigned char* data, + size_t size) { + std::lock_guard lock(realtime_cache_mutex_); + realtime_packets_cache_.push_back({ + packet_type, + std::vector(data, data + size) + }); + } + + // 获取并清空实时数据缓存 + std::vector get_and_clear_realtime_packets() { + std::lock_guard lock(realtime_cache_mutex_); + auto packets = std::move(realtime_packets_cache_); + realtime_packets_cache_.clear(); + return packets; + } + + // 重置实时数据(包括缓存) + void reset_realtime_data() { + std::lock_guard lock(realtime_cache_mutex_); + realtime_packets_cache_.clear(); + } + private: int index_; @@ -213,7 +247,38 @@ public: bool clear_float_cache(const std::string& identifier); // 新增:设置实时数据收发计数 - bool set_real_state_count(const std::string& identifier, int count, ushort point_id = 1); + bool set_real_state_count(const std::string& identifier, int count, ushort point_id); + + // 添加实时数据包到设备缓存 + bool add_realtime_packet_to_device(const std::string& identifier, + unsigned char packet_type, + const unsigned char* data, + size_t size) { + std::lock_guard lock(mutex_); + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + ctx->add_realtime_packet(packet_type, data, size); + return true; + } + } + return false; + } + + // 获取并清空实时数据缓存 + std::vector + get_and_clear_realtime_packets(const std::string& identifier) { + std::lock_guard lock(mutex_); + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + return ctx->get_and_clear_realtime_packets(); + } + } + return {}; + } private: ClientManager() : loop_(nullptr) {} std::unordered_map> clients_; diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 079197f..3fdb929 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -56,7 +56,7 @@ void process_received_message(string mac, string id,const char* data, size_t len std::cout << "cloud login: " << mac << " state: success!" << std::endl; //装置登录成功 ClientManager::instance().set_cloud_status(id, 1); //设置了云前置登录状态为已登录 - //ClientManager::instance().set_real_state_count("D002", 10);//登录后测试实时 + ClientManager::instance().set_real_state_count("D002", 1,1);//登录后测试实时 } if (udata[19] == 0x00) { std::cout << "cloud login: " << mac << " state: fail!" << std::endl; @@ -80,6 +80,7 @@ void process_received_message(string mac, string id,const char* data, size_t len std::cerr << "Failed to get device state for: " << id << std::endl; return; } + // 根据装置状态处理报文 switch (currentState) { case DeviceState::IDLE: @@ -199,7 +200,7 @@ void process_received_message(string mac, string id,const char* data, size_t len 1, //数据集序号(以数据集方式上送),无填-1 arr //数据数组 ); - std::cout << js << std::endl; + //std::cout << js << std::endl; queue_data_t data; data.monitor_no = 1; @@ -278,6 +279,79 @@ void process_received_message(string mac, string id,const char* data, size_t len } break; + case DeviceState::READING_REALSTAT: + //读取实时数据状态 + std::cout << "READING_REALSTAT state: Processing stats data from " << mac << std::endl; + if (udata[8] == static_cast(MsgResponseType::Response_New_3S)) { + unsigned char packet_type = udata[13]; + // 将数据添加到缓存 + const uint8_t* data_ptr = parser.RecvData.data() + 4; + size_t data_size = parser.RecvData.size() - 4; + ClientManager::instance().add_realtime_packet_to_device( + id, packet_type, data_ptr, data_size + ); + + // 如果不是最后一个包,请求下一个包 + if (packet_type != 0x06) { + unsigned char next_packet_type = packet_type + 1; + auto sendbuff = generate_realstat_message( + static_cast(udata[12]), + next_packet_type, + static_cast(0x01) + ); + ClientManager::instance().change_device_state( + id, DeviceState::READING_REALSTAT, sendbuff + ); + } + else { + // 获取并清空缓存 + auto packets = ClientManager::instance().get_and_clear_realtime_packets(id); + + // 按包类型排序(01-06) + std::sort(packets.begin(), packets.end(), + [](const ClientContext::RealtimePacket& a, + const ClientContext::RealtimePacket& b) { + return a.packet_type < b.packet_type; + }); + + RealtagPqDate_float realdata; + // 按顺序解析每个包 + for (const auto& packet : packets) { + switch (packet.packet_type) { + case 0x01: + realdata.ParsePacket1(packet.data.data(), packet.data.size()); + break; + case 0x02: + realdata.ParsePacket2(packet.data.data(), packet.data.size()); + break; + case 0x03: + realdata.ParsePacket3(packet.data.data(), packet.data.size()); + break; + case 0x04: + realdata.ParsePacket4(packet.data.data(), packet.data.size()); + break; + case 0x05: + realdata.ParsePacket5(packet.data.data(), packet.data.size()); + break; + case 0x06: + realdata.ParsePacket6(packet.data.data(), packet.data.size()); + break; + } + } + + std::string base64 = realdata.ConvertToBase64(); + std::cout << base64 << std::endl; + // 处理完成后重置状态 + ClientManager::instance().change_device_state(id, DeviceState::IDLE); + } + } + else { + // 装置答非所问异常 + // 接收实时数据错误,调整为空闲状态,处理下一项工作。 + ClientManager::instance().change_device_state(id, DeviceState::IDLE); + } + break; + case DeviceState::CUSTOM_ACTION: // 自定义动作状态 std::cout << "CUSTOM_ACTION state: Processing custom response from " << mac << std::endl; diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 49bbd5e..3080a27 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -131,15 +131,15 @@ void* client_manager_thread(void* arg) { std::vector points2 = { {"P101", "Generator Output", "D002",1 ,1, 1, 1, 1} }; - //00-B7-8D-A8-00-D6 + //00-B7-8D-A8-00-D6 00-B7-8D-01-79-06 // 创建装置列表 std::vector devices = { { - "D001", "Primary Device", "Model-X", "00-B7-8D-A8-00-D9", + "D001", "Primary Device", "Model-X", "00-B7-8D-01-79-06", 1, points1 }, { - "D002", "Backup Device", "Model-Y", "00-B7-8D-01-79-06", + "D002", "Backup Device", "Model-Y", "00-B7-8D-A8-00-D6", 1, points2 } };