From 50b21bcd3ebc3631065a39b3ec5525f3ea62e21a Mon Sep 17 00:00:00 2001 From: zw <3466561528@qq.com> Date: Thu, 3 Jul 2025 11:13:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86socket=E9=80=9A?= =?UTF-8?q?=E8=AE=AF=E6=A1=86=E6=9E=B6=EF=BC=8C=E6=B7=BB=E5=8A=A0=E4=BA=86?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E6=95=B0=E6=8D=AE=E6=97=B6=E9=97=B4=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=EF=BC=8C=E7=BB=9F=E8=AE=A1=E6=95=B0=E6=8D=AE=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LFtid1056/PQSMsg.cpp | 118 +++++- LFtid1056/PQSMsg.h | 791 +++++++++++++++++++++++++++++++++++++- LFtid1056/client2.cpp | 508 +++++++++++++++++++++++- LFtid1056/client2.h | 144 ++++++- LFtid1056/dealMsg.cpp | 221 ++++++++++- LFtid1056/dealMsg.h | 3 + LFtid1056/main_thread.cpp | 45 +-- 7 files changed, 1761 insertions(+), 69 deletions(-) diff --git a/LFtid1056/PQSMsg.cpp b/LFtid1056/PQSMsg.cpp index 1776ef4..14e5e5c 100644 --- a/LFtid1056/PQSMsg.cpp +++ b/LFtid1056/PQSMsg.cpp @@ -7,8 +7,22 @@ #include #include #include "client2.h" -#include "PQSMsg.h" +// 辅助转换函数 +float IntToFloat(int num) { + return static_cast(num) / 65536.0f; +} +float ShorToFloat100(short num) { + return static_cast(num) / 100.0f; +} + +float ShorToFloat1000(short num) { + return static_cast(num) / 1000.0f; +} + +float ShorToFloat10000(short num) { + return static_cast(num) / 10000.0f; +} // 辅助函数:解析MAC地址并填充到缓冲区 void GetMAC(const std::string& strMAC, std::vector& packet, size_t startIndex) { // 移除所有空格和短横线 @@ -50,6 +64,69 @@ void GetMAC(const std::string& strMAC, std::vector& packet, size_ } } +// CRC计算函数 +unsigned char GetCrcSum(const std::vector& Check, int nOffset, int nLen) { + unsigned char reg_b = 0x00; + for (int i = 0; i < nLen; ++i) { + if (static_cast(i + nOffset) >= Check.size()) { + throw std::out_of_range("Index out of range in GetCrcSum"); + } + reg_b += Check[i + nOffset]; + } + return reg_b; +} + +// 主函数:组装二进制报文 +std::vector GetMsg(const std::vector& SrcData, unsigned char nType) { + // 参数检查 + if (SrcData.empty() || + ((nType < 0x01 || nType > 0xA4) && nType != 0xFF)) { + return {}; + } + + try { + // 计算总长度:报文头(6) + 额外字段(2) + 功能码(1) + 数据体 + CRC+结束符(2) + const size_t total_len = 6 + 2 + 1 + SrcData.size() + 2; + std::vector msg(total_len); + + // 组装报文头 (6字节) + msg[0] = 0xEB; // 报文头 + msg[1] = 0x90; // 报文头 + msg[2] = 0x00; // 备用 + msg[3] = 0x00; // 备用 + + // 设置长度字段(数据体长度+功能码) + uint16_t data_len = static_cast(SrcData.size() + 1); + msg[4] = static_cast(data_len >> 8); // 长度高字节 + msg[5] = static_cast(data_len & 0xFF); // 长度低字节 + + // 额外字段 (2字节) + msg[6] = 0x00; // 备用 + msg[7] = 0xFF; // 备用 + + // 功能码 + msg[8] = nType; + + // 复制数据体 + if (!SrcData.empty()) { + std::copy(SrcData.begin(), SrcData.end(), msg.begin() + 9); + } + + // 计算CRC(从索引8开始,长度 = 功能码+数据体) + unsigned char crc = GetCrcSum(msg, 8, 1 + SrcData.size()); + msg[msg.size() - 2] = crc; + msg[msg.size() - 1] = 0x16; // 结束符 + + return msg; + } + catch (const std::exception& ex) { + throw std::runtime_error(std::string("Exception in GetMsg: ") + ex.what()); + } + catch (...) { + throw std::runtime_error("Unknown exception in GetMsg"); + } +} + // 生成装置云服务登录报文 std::vector generate_frontlogin_message(const std::string& strMac) { @@ -89,3 +166,42 @@ std::vector generate_frontlogin_message(const std::string& strMac return packet; } +//询问统计数据时间报文 +std::vector generate_statequerytime_message() { + // 创建2字节数据缓冲区(初始化为0) + std::vector DataBuf(2, 0x00); + + // 调用GetMsg生成完整报文 + return GetMsg(DataBuf, static_cast(MsgRequestType::Request_StatTime)); +} + +//询问统计数据报文 +std::vector generate_statequerystat_message(tagTime time, uint16_t nDeviceNo, uint16_t nDataType) { + // 计算总大小:3(备用) + 2(nDeviceNo) + 2(nDataType) + time结构大小 + const size_t totalSize = 3 + 2 * sizeof(uint16_t) + time.GetSize(); + std::vector DataBuf(totalSize, 0x00); // 初始化为全0 + + size_t offset = 0; + + // 1. 跳过3字节备用区(已初始化为0) + offset += 3; + + // 2. 写入nDeviceNo(网络字节序) + uint16_t netDeviceNo = htons(nDeviceNo); + memcpy(DataBuf.data() + offset, &netDeviceNo, sizeof(uint16_t)); + offset += sizeof(uint16_t); + + // 3. 写入nDataType(网络字节序) + uint16_t netDataType = htons(nDataType); + memcpy(DataBuf.data() + offset, &netDataType, sizeof(uint16_t)); + offset += sizeof(uint16_t); + + // 4. 写入time结构(内部已处理网络字节序) + time.GetStructBuf(DataBuf.data(), DataBuf.size(), offset); + + // 生成完整报文 + return GetMsg(DataBuf, static_cast(MsgRequestType::Request_Stat)); +} + + + diff --git a/LFtid1056/PQSMsg.h b/LFtid1056/PQSMsg.h index 4fe87fd..689c81e 100644 --- a/LFtid1056/PQSMsg.h +++ b/LFtid1056/PQSMsg.h @@ -6,11 +6,800 @@ #include #include #include +#include +#include // 字节序转换 +#include +using namespace std; + +float IntToFloat(int num); +float ShorToFloat100(short num); +float ShorToFloat1000(short num); +float ShorToFloat10000(short num); + +// 发送报文功能码枚举 +enum class MsgRequestType : unsigned char { + //询问统计数据时间 + Request_StatTime = 0x8b, + //询问统计数据 + Request_Stat = 0x8a +}; +// 接收报文功能码枚举 +enum class MsgResponseType : unsigned char { + //询问统计数据时间 + Response_StatTime = 0x27, + //询问统计数据 + Response_Stat = 0x26 +}; +//基础消息结构 +class MessageParser { +public: + // 成员变量 + uint8_t msgType; //功能码 + std::vector RecvData; //数据体 + int nMsgLen; //功能码+帧序号+数据体 + int nRecvDataLen; //数据体长度 + + // 主处理函数 - 返回bool类型 + bool SetMsg(const uint8_t* udata, size_t data_size) { + // 1. 检查空指针 + if (udata == nullptr) { + return false; // 数据处理失败 + } + + // 2. 检查最小长度(6字节头部) + constexpr size_t MIN_HEADER_SIZE = 6; + if (data_size < MIN_HEADER_SIZE) { + return false; // 头部长度不足 + } + + // 3. 提取报文长度(大端序) + nMsgLen = (static_cast(udata[4]) << 8) | udata[5]; + + // 4. 验证完整报文长度 (8 + nMsgLen) + if (data_size < 8 + nMsgLen) { + return false; // 报文不完整 + } + + // 5. 计算数据区长度 删除了功能码和帧序号4字节 + nRecvDataLen = nMsgLen - 4; + + // 6. CRC校验(根据实际需要实现) + /* + if (!ValidateCRC(udata, 8 + nMsgLen)) { + return false; // CRC校验失败 + } + */ + + // 7. 提取消息类型 (索引位置 6 + 2 = 8) + msgType = udata[8]; + + // 8. 提取数据区 (索引位置12开始) + RecvData.clear(); + if (nRecvDataLen > 0) { + // 确保不越界(nRecvDataLen = nMsgLen - 4) + RecvData.assign(udata + 12, udata + 12 + nRecvDataLen); + } + + return true; // 数据处理成功 + } +}; + +//接收装置时标对象 +class tagTime { +public: + uint16_t DeviceYear; + uint16_t DeviceMonth; + uint16_t DeviceDay; + uint16_t DeviceHour; + uint16_t DeviceMinute; + uint16_t DeviceSecond; + + // 返回结构体二进制大小 + static constexpr size_t GetSize() { + return 6 * sizeof(uint16_t); + } + + // 默认构造函数 + tagTime() : + DeviceYear(1970), + DeviceMonth(1), + DeviceDay(1), + DeviceHour(0), + DeviceMinute(0), + DeviceSecond(0) {} + + // 从std::tm构造 + explicit tagTime(const std::tm& dt) : + DeviceYear(static_cast(dt.tm_year + 1900)), + DeviceMonth(static_cast(dt.tm_mon + 1)), + DeviceDay(static_cast(dt.tm_mday)), + DeviceHour(static_cast(dt.tm_hour)), + DeviceMinute(static_cast(dt.tm_min)), + DeviceSecond(static_cast(dt.tm_sec)) {} + + // 复制函数 + void Clone(const tagTime& src) { + DeviceYear = src.DeviceYear; + DeviceMonth = src.DeviceMonth; + DeviceDay = src.DeviceDay; + DeviceHour = src.DeviceHour; + DeviceMinute = src.DeviceMinute; + DeviceSecond = src.DeviceSecond; + } + + // 从二进制流解析(网络字节序) + bool SetStructBuf(const uint8_t* bArray, size_t bufSize, size_t offset = 0) { + if (bufSize - offset < GetSize()) { + return false; + } + + const uint8_t* ptr = bArray + offset; + + DeviceYear = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(uint16_t); + DeviceMonth = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(uint16_t); + DeviceDay = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(uint16_t); + DeviceHour = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(uint16_t); + DeviceMinute = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(uint16_t); + DeviceSecond = ntohs(*reinterpret_cast(ptr)); + + return true; + } + + // 序列化为二进制流(网络字节序) + size_t GetStructBuf(uint8_t* bArray, size_t bufSize, size_t offset = 0) const { + if (bufSize - offset < GetSize()) { + return 0; + } + + uint8_t* ptr = bArray + offset; + + *reinterpret_cast(ptr) = htons(DeviceYear); + ptr += sizeof(uint16_t); + *reinterpret_cast(ptr) = htons(DeviceMonth); + ptr += sizeof(uint16_t); + *reinterpret_cast(ptr) = htons(DeviceDay); + ptr += sizeof(uint16_t); + *reinterpret_cast(ptr) = htons(DeviceHour); + ptr += sizeof(uint16_t); + *reinterpret_cast(ptr) = htons(DeviceMinute); + ptr += sizeof(uint16_t); + *reinterpret_cast(ptr) = htons(DeviceSecond); + + return GetSize(); + } +}; + +//谐波间谐波序列长度 默认50 +constexpr int HARMNUM = 50; + +// 间谐波数据结构 +struct tagInHarmData { + int32_t Val; + int32_t f; + + static constexpr size_t GetSize() { + return sizeof(Val) + sizeof(f); + } + + // 从网络字节序解析 + bool SetStructBuf(const uint8_t* ptr) { + Val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + f = ntohl(*reinterpret_cast(ptr)); + return true; + } +}; + +// 功率数据结构 +struct tagPowerData { + int32_t P; + int32_t Q; + int32_t S; + + static constexpr size_t GetSize() { + return sizeof(P) + sizeof(Q) + sizeof(S); + } + + // 从网络字节序解析 + bool SetStructBuf(const uint8_t* ptr) { + P = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + Q = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + S = ntohl(*reinterpret_cast(ptr)); + return true; + } +}; + +// 主数据结构 (使用1字节对齐) +#pragma pack(push, 1) +class tagPqData { +public: + int16_t name; // 监测点号 + int16_t Data_Type; // 数据类型 + tagTime time; // 时间 + + // 各种数据数组 + std::array Rms; // 电压/电流有效值 + std::array UU_Deviation; // 电压上偏差 + std::array UL_Deviation; // 电压下偏差 + std::array F_Deviation; // 频率偏差 + std::array, 2> UI_Seq; // 电压电流序量 + std::array, 6> FuHarm; // 整次谐波 + std::array, 6> FuHarmPhase; // 谐波相角 + std::array, 6> InHarm; // 间谐波 + std::array, 4> Total_Power; // 总功率 + std::array, 4> Harm_Power; // 谐波功率 + std::array, 6> Harm_Contain; // 谐波含有率 + std::array Harm_Aberrance; // 谐波畸变率 + std::array Cos_PF; // 视在功率因数 + std::array Cos_DF; // 位移功率因数 + std::array U_Fluctuation; // 电压波动 + std::array U_Flicker; // 电压闪变 + std::array UL_Flicker; // 电压长闪变 + + // 构造函数 + tagPqData() : name(0), Data_Type(0) { + // 所有数组初始化为0 + Rms.fill(0); + UU_Deviation.fill(0); + UL_Deviation.fill(0); + F_Deviation.fill(0); + + for (auto& arr : UI_Seq) arr.fill(0); + for (auto& arr : FuHarm) arr.fill(0); + for (auto& arr : FuHarmPhase) arr.fill(0); + for (auto& arr : Harm_Contain) arr.fill(0); + + Harm_Aberrance.fill(0); + Cos_PF.fill(0); + Cos_DF.fill(0); + U_Fluctuation.fill(0); + U_Flicker.fill(0); + UL_Flicker.fill(0); + } + + // 获取结构体大小 + static constexpr size_t GetSize() { + return sizeof(name) + sizeof(Data_Type) + + tagTime::GetSize() + + sizeof(Rms) + + sizeof(UU_Deviation) + + sizeof(UL_Deviation) + + sizeof(F_Deviation) + + sizeof(UI_Seq) + + sizeof(FuHarm) + + sizeof(FuHarmPhase) + + sizeof(InHarm) + + sizeof(Total_Power) + + sizeof(Harm_Power) + + sizeof(Harm_Contain) + + sizeof(Harm_Aberrance) + + sizeof(Cos_PF) + + sizeof(Cos_DF) + + sizeof(U_Fluctuation) + + sizeof(U_Flicker) + + sizeof(UL_Flicker); + } + + // 从网络字节序解析二进制数据 + bool SetStructBuf(const uint8_t* bArray, size_t bufSize, size_t offset = 0) { + if (bufSize - offset < GetSize()) { + return false; + } + + const uint8_t* ptr = bArray + offset; + size_t remaining = bufSize - offset; + + // 解析基本字段 + name = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(name); + remaining -= sizeof(name); + + Data_Type = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(Data_Type); + remaining -= sizeof(Data_Type); + + // 解析时间结构 + if (!time.SetStructBuf(ptr, remaining)) { + return false; + } + ptr += tagTime::GetSize(); + remaining -= tagTime::GetSize(); + + // 解析一维int32数组 - 使用显式循环 + for (auto& val : Rms) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + + for (auto& val : UU_Deviation) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + + for (auto& val : UL_Deviation) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + + for (auto& val : F_Deviation) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + + // 解析二维int32数组 - 使用显式循环 + for (auto& arr : UI_Seq) { + for (auto& val : arr) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + } + + for (auto& arr : FuHarm) { + for (auto& val : arr) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + } + + for (auto& arr : FuHarmPhase) { + for (auto& val : arr) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + } + + // 解析间谐波数据 + for (auto& arr : InHarm) { + for (auto& item : arr) { + if (remaining < tagInHarmData::GetSize()) return false; + item.SetStructBuf(ptr); + ptr += tagInHarmData::GetSize(); + remaining -= tagInHarmData::GetSize(); + } + } + + // 解析总功率 + for (auto& arr : Total_Power) { + for (auto& val : arr) { + if (remaining < sizeof(int32_t)) return false; + val = ntohl(*reinterpret_cast(ptr)); + ptr += sizeof(int32_t); + remaining -= sizeof(int32_t); + } + } + + // 解析谐波功率数据 + for (auto& arr : Harm_Power) { + for (auto& item : arr) { + if (remaining < tagPowerData::GetSize()) return false; + item.SetStructBuf(ptr); + ptr += tagPowerData::GetSize(); + remaining -= tagPowerData::GetSize(); + } + } + + // 解析二维int16数组 - 使用显式循环 + for (auto& arr : Harm_Contain) { + for (auto& val : arr) { + if (remaining < sizeof(int16_t)) return false; + val = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(int16_t); + remaining -= sizeof(int16_t); + } + } + + // 解析一维int16数组 - 使用显式循环 + for (auto& val : Harm_Aberrance) { + if (remaining < sizeof(int16_t)) return false; + val = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(int16_t); + remaining -= sizeof(int16_t); + } + + for (auto& val : Cos_PF) { + if (remaining < sizeof(int16_t)) return false; + val = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(int16_t); + remaining -= sizeof(int16_t); + } + + for (auto& val : Cos_DF) { + if (remaining < sizeof(int16_t)) return false; + val = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(int16_t); + remaining -= sizeof(int16_t); + } + + for (auto& val : U_Fluctuation) { + if (remaining < sizeof(int16_t)) return false; + val = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(int16_t); + remaining -= sizeof(int16_t); + } + + for (auto& val : U_Flicker) { + if (remaining < sizeof(int16_t)) return false; + val = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(int16_t); + remaining -= sizeof(int16_t); + } + + for (auto& val : UL_Flicker) { + if (remaining < sizeof(int16_t)) return false; + val = ntohs(*reinterpret_cast(ptr)); + ptr += sizeof(int16_t); + remaining -= sizeof(int16_t); + } + + return true; + } +}; +#pragma pack(pop) + +// 间谐波浮点结构 +struct tagInHarmData_float { + float Val; + float f; +}; + +// 功率浮点结构 +struct tagPowerData_float { + float P; + float Q; + float S; +}; + +// PQ数据浮点结构 +class tagPqData_Float { +public: + short name; // 监测点号 + short Data_Type; // 数据类型 + tagTime time; // 时间 + + // 各种浮点数组 + std::array Rms; + std::array UU_Deviation; + std::array UL_Deviation; + std::array F_Deviation; + std::array, 2> UI_Seq; + std::array, 6> FuHarm; + std::array, 6> FuHarmPhase; + std::array, 6> InHarm; + std::array, 4> Total_Power; + std::array, 4> Harm_Power; + std::array, 6> Harm_Contain; + std::array Harm_Aberrance; // 注意:C#中是8元素 + std::array Cos_PF; + std::array Cos_DF; + std::array U_Fluctuation; + std::array U_Flicker; + std::array UL_Flicker; + + // 构造函数初始化数组 + tagPqData_Float() { + Rms.fill(0.0f); + UU_Deviation.fill(0.0f); + UL_Deviation.fill(0.0f); + F_Deviation.fill(0.0f); + + for (auto& arr : UI_Seq) arr.fill(0.0f); + for (auto& arr : FuHarm) arr.fill(0.0f); + for (auto& arr : FuHarmPhase) arr.fill(0.0f); + for (auto& arr : Harm_Contain) arr.fill(0.0f); + + Harm_Aberrance.fill(0.0f); + Cos_PF.fill(0.0f); + Cos_DF.fill(0.0f); + U_Fluctuation.fill(0.0f); + U_Flicker.fill(0.0f); + UL_Flicker.fill(0.0f); + + // 初始化嵌套结构 + for (auto& arr : InHarm) { + for (auto& item : arr) { + item = tagInHarmData_float{ 0.0f, 0.0f }; + } + } + + for (auto& arr : Harm_Power) { + for (auto& item : arr) { + item = tagPowerData_float{ 0.0f, 0.0f, 0.0f }; + } + } + } + + // 转换函数 + void SetFloatValue(const tagPqData& SrcData, float fPT, float fCT) { + time.Clone(SrcData.time); + + // F_Deviation + for (int i = 0; i < 2; i++) { + F_Deviation[i] = IntToFloat(SrcData.F_Deviation[i]); + } + + // UI_Seq + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 4; j++) { + if (i == 0) { // 电压 + if (j == 2) { // 正序 + UI_Seq[i][j] = IntToFloat(SrcData.UI_Seq[i][j]) * fPT; + } + else if (j < 3) { + UI_Seq[i][j] = IntToFloat(SrcData.UI_Seq[i][j]) * fPT * 1000.0f; + } + else { + UI_Seq[i][j] = IntToFloat(SrcData.UI_Seq[i][j]); + } + } + else { // 电流 + if (j < 3) { + UI_Seq[i][j] = IntToFloat(SrcData.UI_Seq[i][j]) * fCT; + } + else { + UI_Seq[i][j] = IntToFloat(SrcData.UI_Seq[i][j]); + } + } + } + } + + // 波动和闪变 + for (int i = 0; i < 3; i++) { + U_Fluctuation[i] = ShorToFloat1000(SrcData.U_Fluctuation[i]); + U_Flicker[i] = ShorToFloat1000(SrcData.U_Flicker[i]); + UL_Flicker[i] = ShorToFloat1000(SrcData.UL_Flicker[i]); + } + + // 功率因数 + for (int i = 0; i < 4; i++) { + Cos_PF[i] = ShorToFloat10000(SrcData.Cos_PF[i]); + Cos_DF[i] = ShorToFloat10000(SrcData.Cos_DF[i]); + } + + // 总功率 + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 3; j++) { + Total_Power[i][j] = IntToFloat(SrcData.Total_Power[i][j]) * fPT * fCT; + } + } + + // 谐波功率 + for (int i = 0; i < 4; i++) { + for (int j = 0; j < HARMNUM; j++) { + Harm_Power[i][j].P = IntToFloat(SrcData.Harm_Power[i][j].P) * fPT * fCT; + Harm_Power[i][j].Q = IntToFloat(SrcData.Harm_Power[i][j].Q) * fPT * fCT; + Harm_Power[i][j].S = IntToFloat(SrcData.Harm_Power[i][j].S) * fPT * fCT; + } + } + + // 谐波相关数据 + for (int i = 0; i < 6; i++) { + UU_Deviation[i] = IntToFloat(SrcData.UU_Deviation[i]); + UL_Deviation[i] = IntToFloat(SrcData.UL_Deviation[i]); + Harm_Aberrance[i] = ShorToFloat100(SrcData.Harm_Aberrance[i]); + + for (int j = 0; j < HARMNUM; j++) { + if (i < 3) { // 电压谐波 + FuHarm[i][j] = IntToFloat(SrcData.FuHarm[i][j]) * fPT; + } + else { // 电流谐波 + FuHarm[i][j] = IntToFloat(SrcData.FuHarm[i][j]) * fCT; + } + + FuHarmPhase[i][j] = IntToFloat(SrcData.FuHarmPhase[i][j]); + InHarm[i][j].Val = IntToFloat(SrcData.InHarm[i][j].Val); + Harm_Contain[i][j] = ShorToFloat100(SrcData.Harm_Contain[i][j]); + } + } + + // RMS值 + for (int i = 0; i < 9; i++) { + if (i > 2 && i < 6) { // 电流 (索引3,4,5) + Rms[i] = IntToFloat(SrcData.Rms[i]) * fCT; + } + else { // 电压和其他 + Rms[i] = IntToFloat(SrcData.Rms[i]) * fPT; + } + } + } + + // 将浮点字段转换为Base64字符串 + std::string ConvertToBase64() const { + // 1. 计算总浮点数 + const size_t total_floats = CalculateFloatCount(); + + // 2. 创建缓冲区并填充数据 + std::vector float_buffer; + float_buffer.reserve(total_floats); + SerializeFloats(float_buffer); + + // 3. 将浮点缓冲区转换为字节数据 + const size_t byte_size = float_buffer.size() * sizeof(float); + const unsigned char* byte_data = + reinterpret_cast(float_buffer.data()); + + // 4. Base64编码 + return base64_encode(byte_data, byte_size); + } + + // 计算浮点字段总数 + size_t CalculateFloatCount() const { + size_t count = 0; + + // 基本数组 + count += Rms.size(); + count += UU_Deviation.size(); + count += UL_Deviation.size(); + count += F_Deviation.size(); + + // 二维数组 + for (const auto& arr : UI_Seq) count += arr.size(); + for (const auto& arr : FuHarm) count += arr.size(); + for (const auto& arr : FuHarmPhase) count += arr.size(); + + // 嵌套结构数组 + for (const auto& arr : InHarm) { + for (const auto& item : arr) { + count += 2; // 每个tagInHarmData_float包含2个float + } + } + + // 功率数组 + for (const auto& arr : Total_Power) count += arr.size(); + + for (const auto& arr : Harm_Power) { + for (const auto& item : arr) { + count += 3; // 每个tagPowerData_float包含3个float + } + } + + // 其他数组 + for (const auto& arr : Harm_Contain) count += arr.size(); + count += Harm_Aberrance.size(); + count += Cos_PF.size(); + count += Cos_DF.size(); + count += U_Fluctuation.size(); + count += U_Flicker.size(); + count += UL_Flicker.size(); + + return count; + } +private: + // 序列化浮点数据到缓冲区 + void SerializeFloats(std::vector& buffer) const { + // 基本数组 + for (const auto& val : Rms) buffer.push_back(val); + for (const auto& val : UU_Deviation) buffer.push_back(val); + for (const auto& val : UL_Deviation) buffer.push_back(val); + for (const auto& val : F_Deviation) buffer.push_back(val); + + // 二维数组 + for (const auto& arr : UI_Seq) { + for (const auto& val : arr) buffer.push_back(val); + } + for (const auto& arr : FuHarm) { + for (const auto& val : arr) buffer.push_back(val); + } + for (const auto& arr : FuHarmPhase) { + for (const auto& val : arr) buffer.push_back(val); + } + + // 嵌套结构数组 + for (const auto& arr : InHarm) { + for (const auto& item : arr) { + buffer.push_back(item.Val); + buffer.push_back(item.f); + } + } + + // 功率数组 + for (const auto& arr : Total_Power) { + for (const auto& val : arr) buffer.push_back(val); + } + + for (const auto& arr : Harm_Power) { + for (const auto& item : arr) { + buffer.push_back(item.P); + buffer.push_back(item.Q); + buffer.push_back(item.S); + } + } + + // 其他数组 + for (const auto& arr : Harm_Contain) { + for (const auto& val : arr) buffer.push_back(val); + } + for (const auto& val : Harm_Aberrance) buffer.push_back(val); + for (const auto& val : Cos_PF) buffer.push_back(val); + for (const auto& val : Cos_DF) buffer.push_back(val); + for (const auto& val : U_Fluctuation) buffer.push_back(val); + for (const auto& val : U_Flicker) buffer.push_back(val); + for (const auto& val : UL_Flicker) buffer.push_back(val); + } + + // Base64编码函数 + static std::string base64_encode(const unsigned char* bytes_to_encode, size_t in_len) { + static const char base64_chars[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + + std::string ret; + int i = 0; + int j = 0; + unsigned char char_array_3[3]; + unsigned char char_array_4[4]; + + while (in_len--) { + char_array_3[i++] = *(bytes_to_encode++); + if (i == 3) { + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for (i = 0; i < 4; i++) + ret += base64_chars[char_array_4[i]]; + i = 0; + } + } + + if (i) { + for (j = i; j < 3; j++) + char_array_3[j] = '\0'; + + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for (j = 0; j < i + 1; j++) + ret += base64_chars[char_array_4[j]]; + + while (i++ < 3) + ret += '='; + } + + return ret; + } +}; + +//计算报文帧长度 1帧1024为1K +constexpr int PqDataLen = tagPqData::GetSize(); +constexpr int Stat_PacketNum = (PqDataLen / 1024 > 0) ? (PqDataLen / 1024 + 1) : (PqDataLen / 1024); // 生成带协议头的二进制报文 std::vector generate_binary_message( uint16_t msg_type, - const std::vector & payload); + const std::vector& payload); // 生成装置云服务登录报文 std::vector generate_frontlogin_message(const std::string& strMac); +//生成询问统计数据时间报文 +std::vector generate_statequerytime_message(); +//生成询问统计数据报文 +std::vector generate_statequerystat_message(tagTime time, uint16_t nDeviceNo, uint16_t nDataType); \ No newline at end of file diff --git a/LFtid1056/client2.cpp b/LFtid1056/client2.cpp index ef43dd2..6ea7720 100644 --- a/LFtid1056/client2.cpp +++ b/LFtid1056/client2.cpp @@ -1,6 +1,4 @@ #include "client2.h" -#include "PQSMsg.h" -#include "dealMsg.h" #include #include #include @@ -11,7 +9,7 @@ #include // 閰嶇疆鍙傛暟 -constexpr int BASE_RECONNECT_DELAY = 5000; // 鍩虹閲嶈繛寤惰繜(ms) +constexpr int BASE_RECONNECT_DELAY = 20000; // 鍩虹閲嶈繛寤惰繜(ms) constexpr int MAX_RECONNECT_DELAY = 60000; // 鏈澶ч噸杩炲欢杩(ms) constexpr const char* SERVER_IP = "101.132.39.45"; // 鐩爣鏈嶅姟鍣↖P"101.132.39.45" constexpr int SERVER_PORT = 1056; // 鐩爣鏈嶅姟鍣ㄧ鍙1056 @@ -23,7 +21,8 @@ extern SafeMessageQueue message_queue; // ClientContext 瀹炵幇 ClientContext::ClientContext(uv_loop_t* loop, const DeviceInfo& device, int index) : loop(loop), state(ConnectionState::DISCONNECTED), - reconnect_attempts(0), shutdown(false), device_info(device), index_(index) { + reconnect_attempts(0), shutdown(false), device_info(device), index_(index),cloudstatus(0), current_state_(DeviceState::IDLE), + state_start_time_(0) { recv_buffer_.reserve(4096); // 棰勫垎閰4KB缂撳啿鍖 @@ -54,7 +53,7 @@ void ClientContext::init_tcp() { void ClientContext::start_timer() { if (!uv_is_active((uv_handle_t*)&timer)) { - uv_timer_start(&timer, on_timer, 6000, 6000); + uv_timer_start(&timer, on_timer, 5000,5000); } } @@ -187,6 +186,132 @@ void ClientContext::put_packet_into_queue( } } +// 鏂板鏂规硶锛氭敼鍙樿缃姸鎬 +void ClientContext::change_state(DeviceState new_state, const std::vector& packet) { + std::lock_guard lock(state_mutex_); + // 鐩存帴鏇存柊鐘舵侊紝涓嶈皟鐢ㄥ叾浠栭攣鏂规硶 + current_state_ = new_state; + current_packet_ = packet; + state_start_time_ = uv_now(loop); + + std::cout << "[Device " << device_info.device_id + << "] State changed to: " << static_cast(new_state) << std::endl; +} + +// 鏂板鏂规硶锛氭坊鍔犲悗缁姩浣 +void ClientContext::add_action(DeviceState state, const std::vector& packet) { + std::lock_guard lock(state_mutex_); + action_queue_.push({ state, packet }); + + std::cout << "[Device " << device_info.device_id + << "] Action added to queue: " << static_cast(state) << std::endl; +} + +// 鏂板鏂规硶锛氬鐞嗙姸鎬佽秴鏃 +void ClientContext::check_state_timeout() { + constexpr uint64_t STATE_TIMEOUT = 30000;//30绉掕秴鏃 + uint64_t now = uv_now(loop); + bool timed_out = false; + + { + std::lock_guard lock(state_mutex_); + if (current_state_ != DeviceState::IDLE && + (now - state_start_time_) > STATE_TIMEOUT) + { + timed_out = true; + current_state_ = DeviceState::IDLE; + } + } + + if (timed_out) { + process_next_action(); // 鍦ㄩ攣澶栬皟鐢 + } +} + +// 鏂板鏂规硶锛氬鐞嗕笅涓涓姩浣 +void ClientContext::process_next_action() { + StateAction next; + { + std::lock_guard lock(state_mutex_); + if (current_state_ != DeviceState::IDLE || action_queue_.empty()) + return; + + next = action_queue_.front(); + action_queue_.pop(); + } // 鎻愬墠閲婃斁閿 + + // 鍦ㄩ攣澶栬皟鐢ㄥ彲鑳介樆濉炵殑鍑芥暟 + change_state(next.state, next.packet); + send_current_packet(); +} + +// 鏂板鏂规硶锛氬彂閫佸綋鍓嶇姸鎬佸搴旂殑鎶ユ枃 +void ClientContext::send_current_packet() { + if (!current_packet_.empty()) { + send_binary_data(this, current_packet_.data(), current_packet_.size()); + } +} + +bool ClientContext::add_stat_packet(const std::vector& packet, int current_packet, int total_packets) { + std::lock_guard lock(stat_cache_mutex_); + + // 濡傛灉鏄涓甯э紝鍒濆鍖栫紦瀛 + if (current_packet == 1) { + stat_packets_cache_.clear(); + expected_total_packets_ = total_packets; + } + + // 娣诲姞鍒扮紦瀛 + stat_packets_cache_.push_back({ current_packet, packet }); + + // 妫鏌ユ槸鍚︽敹榻愭墍鏈夊抚 + return (stat_packets_cache_.size() >= expected_total_packets_); +} + +std::vector ClientContext::get_and_clear_stat_packets() { + std::lock_guard lock(stat_cache_mutex_); + auto packets = std::move(stat_packets_cache_); + stat_packets_cache_.clear(); + expected_total_packets_ = 0; + return packets; +} + +void ClientContext::clear_stat_cache() { + std::lock_guard lock(stat_cache_mutex_); + stat_packets_cache_.clear(); + expected_total_packets_ = 0; +} +// 娣诲姞娴偣鏁版嵁鍒扮紦瀛 +bool ClientContext::add_float_data(ushort point_id, int data_type, const tagPqData_Float& float_data) { + if (data_type < 0 || data_type > 3) return false; + + std::lock_guard lock(float_cache_mutex_); + auto& cache = point_float_cache_[point_id]; + cache.data[data_type] = float_data; + cache.received[data_type] = true; + + // 妫鏌ユ槸鍚﹀洓绉嶆暟鎹被鍨嬮兘宸叉帴鏀 + return cache.received[0] && cache.received[1] && + cache.received[2] && cache.received[3]; +} + +// 鑾峰彇骞舵竻闄ゆ寚瀹氭祴鐐圭殑瀹屾暣娴偣鏁版嵁 +std::array ClientContext::get_and_clear_float_data(ushort point_id) { + std::lock_guard lock(float_cache_mutex_); + auto it = point_float_cache_.find(point_id); + if (it == point_float_cache_.end()) { + return {}; + } + auto data = it->second.data; + point_float_cache_.erase(it); + return data; +} + +// 娓呴櫎鎵鏈夋诞鐐规暟鎹紦瀛 +void ClientContext::clear_float_cache() { + std::lock_guard lock(float_cache_mutex_); + point_float_cache_.clear(); +} /* 缂撳啿鍖哄垎閰嶅洖璋 */ void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = new char[suggested_size]; @@ -230,24 +355,33 @@ void on_write(uv_write_t* req, int status) { } /* 瀹氭椂鍙戦佸洖璋 */ +//5绉掓墽琛屼竴娆″畾鏃跺櫒 void on_timer(uv_timer_t* handle) { ClientContext* ctx = static_cast(handle->data); if (ctx->state != ConnectionState::CONNECTED) { return; } - std::cout << "on_timer: " << ctx->device_info.mac << " send!"<< std::endl; - // 浣跨敤瑁呯疆鑷繁鐨凪AC鍦板潃鐢熸垚鐧诲綍鎶ユ枃 - auto binary_data = generate_frontlogin_message(ctx->device_info.mac); - // 璋冪敤鍙戦佸嚱鏁 - send_binary_data(ctx, binary_data.data(), binary_data.size()); + static int statequerytime = 0;//璇㈤棶缁熻鏁版嵁鏃堕棿鏍囧織 20绉掓墽琛屼竴娆 - //ClientManager::instance().send_to_device(ctx->device_info.mac, binary_data.data(), binary_data.size()); + // 妫鏌ョ姸鎬佽秴鏃 30绉掔姸鎬佹湭鏇存柊鍒欓噸缃负绌洪棽鐘舵 + ctx->check_state_timeout(); - // 鏍规嵁瑁呯疆鐘舵佸彂閫佸叾浠栨暟鎹 - if (ctx->device_info.status == 1) { // 鍦ㄧ嚎鐘舵 - // 鍙互鍙戦佽缃厤缃俊鎭垨娴嬬偣鏁版嵁 + // 瑁呯疆鐧诲綍鎴愬姛鍚庯紝鍙湪绌洪棽鐘舵佸鐞嗗悗缁姩浣 + if (ctx->cloudstatus == 1) { + + //20绉掍竴娆 鎵ц缁熻鏁版嵁鏃堕棿璇㈤棶 + if (++statequerytime >= 4 && ctx->current_state_ == DeviceState::IDLE) { + statequerytime = 0;//閲嶇疆璁℃椂 + auto sendbuff = generate_statequerytime_message();//缁勮璇㈤棶缁熻鏁版嵁鏃堕棿鎶ユ枃 + ctx->add_action(DeviceState::READING_STATS_TIME, sendbuff);//灏嗚鐘舵佷互鍙婂緟鍙戦佹姤鏂囧瓨鍏ラ槦鍒 + } + + //澶勭悊鍚庣画宸ヤ綔闃熷垪鐨勫伐浣 鍙栧嚭涓涓苟鎵ц + if (ctx->current_state_ == DeviceState::IDLE) { + ctx->process_next_action(); + } } } @@ -283,6 +417,18 @@ void on_close(uv_handle_t* handle) { std::cerr << "[Device " << ctx->device_info.device_id << "] Connection closed" << std::endl; ctx->stop_timers(); + // 娓呯┖缂撳瓨 + ctx->clear_stat_cache(); + // 娓呴櫎娴偣鏁版嵁缂撳瓨 + ctx->clear_float_cache(); + + ctx->cloudstatus = 0; + { + std::lock_guard state_lock(ctx->state_mutex_); + ctx->current_state_ = DeviceState::IDLE; // 鐩存帴淇敼鐘舵 + std::queue empty; + std::swap(ctx->action_queue_, empty); + } if (!ctx->shutdown) { int delay = BASE_RECONNECT_DELAY * pow(2, ctx->reconnect_attempts); @@ -344,6 +490,11 @@ void on_connect(uv_connect_t* req, int status) { ctx->state = ConnectionState::CONNECTED; ctx->reconnect_attempts = 0; + //瀹㈡埛绔繛鎺ュ畬姣曞悗锛屽彂閫佽缃櫥闄嗘秷鎭 + std::cout << "connected: " << ctx->device_info.mac << " send login msg!" << std::endl; + auto binary_data = generate_frontlogin_message(ctx->device_info.mac); + send_binary_data(ctx, binary_data.data(), binary_data.size()); + uv_read_start((uv_stream_t*)&ctx->client, alloc_buffer, on_read); ctx->start_timer(); } @@ -493,4 +644,333 @@ void ClientManager::stop_all() { pair.second->close_handles(); } clients_.clear(); +} + +// 鍦–lientManager鎴愬憳鍑芥暟瀹炵幇涓坊鍔犳柟娉曞疄鐜 +void ClientManager::restart_device(const std::string& device_id) { + std::lock_guard lock(mutex_); + ClientContext* target_ctx = nullptr; + + // 鏌ユ壘鍖归厤鐨勮澶囷紙鏀寔device_id鎴杕ac鍦板潃锛 + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == device_id || + ctx->device_info.mac == device_id) { + target_ctx = ctx.get(); + break; + } + } + + if (!target_ctx) { + std::cerr << "[restart_device] Device not found: " << device_id << std::endl; + return; + } + + std::cout << "[restart_device] Restarting device: " << device_id << std::endl; + + // 纭繚涓嶅浜庡叧闂姸鎬 + target_ctx->shutdown = false; + + // 鍋滄鎵鏈夊畾鏃跺櫒 + target_ctx->stop_timers(); + + // 閲嶇疆閲嶈繛璁℃暟鍣 + target_ctx->reconnect_attempts = 0; + + // 鍏抽棴TCP杩炴帴锛堜細瑙﹀彂on_close鍥炶皟锛 + if (!uv_is_closing((uv_handle_t*)&target_ctx->client)) { + uv_close((uv_handle_t*)&target_ctx->client, on_close); + } + else { + // 濡傛灉宸茬粡鍦ㄥ叧闂繃绋嬩腑锛岀洿鎺ヨЕ鍙戦噸杩 + target_ctx->state = ConnectionState::DISCONNECTED; + target_ctx->start_reconnect_timer(0); // 绔嬪嵆閲嶈繛 + } +} + +//淇敼瀹㈡埛绔簯鍓嶇疆鐧诲綍鐘舵 +bool ClientManager::set_cloud_status(const std::string& identifier, int status) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + // 鍖归厤瑁呯疆ID鎴朚AC鍦板潃 + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + + // 淇敼浜戝墠缃櫥褰曠姸鎬 + ctx->cloudstatus = status; + std::cout << "[Device " << identifier + << "] Cloud status updated to: " << status << std::endl; + return true; + } + } + + std::cerr << "[set_cloud_status] Device not found: " << identifier << std::endl; + return false; +} + +bool ClientManager::add_action_to_device(const std::string& identifier, + DeviceState state, + const std::vector& packet) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + + ctx->add_action(state, packet); + return true; + } + } + + std::cerr << "[add_action_to_device] Device not found: " << identifier << std::endl; + return false; +} + +bool ClientManager::change_device_state(const std::string& identifier, + DeviceState new_state, + const std::vector& packet) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + + ctx->change_state(new_state, packet); + return true; + } + } + + std::cerr << "[change_device_state] Device not found: " << identifier << std::endl; + return false; +} + +bool ClientManager::clear_action_queue(const std::string& identifier) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + + std::lock_guard state_lock(ctx->state_mutex_); + std::queue empty; + std::swap(ctx->action_queue_, empty); + return true; + } + } + + std::cerr << "[clear_action_queue] Device not found: " << identifier << std::endl; + return false; +} + +bool ClientManager::get_device_state(const std::string& identifier, DeviceState& out_state) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + + std::lock_guard state_lock(ctx->state_mutex_); + out_state = ctx->current_state_; + return true; + } + } + + std::cerr << "[get_device_state] Device not found: " << identifier << std::endl; + return false; +} + +bool ClientManager::post_message_processing(const std::string& identifier) { + ClientContext* target = nullptr; + { + std::lock_guard lock(mutex_); + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + target = pair.second.get(); + break; + } + } + } // 鎻愬墠閲婃斁manager閿 + + if (!target) { + std::cerr << "Device not found: " << identifier << std::endl; + return false; + } + + // 鐩存帴鎿嶄綔client锛岄伩鍏嶅祵濂楅攣 + if (target->current_state_ == DeviceState::IDLE) { + //绌洪棽鐘舵佹墽琛屼笅涓椤瑰伐浣 + target->process_next_action(); + return true; + } + else { + //闈炵┖闂茬姸鎬佹墽琛屽綋鍓嶅伐浣 + target->send_current_packet(); + return true; + } +} + +//閫氳繃id鎴栬卪ac璇诲彇瑁呯疆涓嬪睘娴嬬偣淇℃伅 +bool ClientManager::get_device_points(const std::string& identifier, + std::vector& out_points) { + std::lock_guard lock(mutex_); + + for (const auto& pair : clients_) { + const auto& ctx = pair.second; + // 鍖归厤瑁呯疆ID鎴朚AC鍦板潃 + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + + // 澶嶅埗娴嬬偣淇℃伅鍒拌緭鍑哄弬鏁 + out_points = ctx->device_info.points; + return true; + } + } + + std::cerr << "[get_device_points] Device not found: " << identifier << std::endl; + return false; +} + +//淇濆瓨澶氬抚鎶ユ枃鑷崇紦瀛樺尯绛夊緟鏀跺叏 +bool ClientManager::add_stat_packet_to_device(const std::string& identifier, + const std::vector& packet, + int current_packet, + int total_packets) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + return ctx->add_stat_packet(packet, current_packet, total_packets); + } + } + + std::cerr << "[add_stat_packet_to_device] Device not found: " << identifier << std::endl; + return false; +} + +//鑾峰彇缂撳瓨鍖哄唴鎵鏈夊甯ф姤鏂囧苟娓呯┖缂撳瓨 +std::vector ClientManager::get_and_clear_stat_packets(const std::string& identifier) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + return ctx->get_and_clear_stat_packets(); + } + } + + std::cerr << "[get_and_clear_stat_packets] Device not found: " << identifier << std::endl; + return {}; +} + +//娓呯┖鎵鏈夌紦瀛樺尯 +bool ClientManager::clear_stat_cache(const std::string& identifier) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + ctx->clear_stat_cache(); + return true; + } + } + + std::cerr << "[clear_stat_cache] Device not found: " << identifier << std::endl; + return false; +} + +// 鑾峰彇pt鍜孋T鍙樻瘮 +bool ClientManager::get_pt_ct_ratio(const std::string& identifier, + int16_t nCpuNo, + float& pt_ratio, + float& ct_ratio) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + // 鍖归厤瑁呯疆ID鎴朚AC鍦板潃 + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + + // 閬嶅巻瑁呯疆鐨勬墍鏈夋祴鐐 + for (const auto& point : ctx->device_info.points) { + // 鍖归厤娴嬬偣搴忓彿 + if (point.nCpuNo == nCpuNo) { + // 璁$畻PT鍙樻瘮 (PT1/PT2) + pt_ratio = (point.PT2 != 0.0) ? + static_cast(point.PT1 / point.PT2) : 1.0f; + + // 璁$畻CT鍙樻瘮 (CT1/CT2) + ct_ratio = (point.CT2 != 0.0) ? + static_cast(point.CT1 / point.CT2) : 1.0f; + + return true; + } + } + std::cerr << "[get_pt_ct_ratio] Point not found for CPU: " + << nCpuNo << " in device: " << identifier << std::endl; + return false; + } + } + + std::cerr << "[get_pt_ct_ratio] Device not found: " << identifier << std::endl; + return false; +} + +// 娣诲姞娴偣鏁版嵁鍒版寚瀹氳澶囩殑缂撳瓨 +bool ClientManager::add_float_data_to_device(const std::string& identifier, + ushort point_id, + int data_type, + const tagPqData_Float& float_data) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + return ctx->add_float_data(point_id, data_type, float_data); + } + } + return false; +} + +// 鑾峰彇骞舵竻闄ゆ寚瀹氭祴鐐圭殑瀹屾暣娴偣鏁版嵁 +std::array ClientManager::get_and_clear_float_data( + const std::string& identifier, ushort point_id) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + return ctx->get_and_clear_float_data(point_id); + } + } + return {}; +} + +// 娓呴櫎璁惧鐨勬墍鏈夋诞鐐圭紦瀛 +bool ClientManager::clear_float_cache(const std::string& identifier) { + std::lock_guard lock(mutex_); + + for (auto& pair : clients_) { + auto& ctx = pair.second; + if (ctx->device_info.device_id == identifier || + ctx->device_info.mac == identifier) { + ctx->clear_float_cache(); + return true; + } + } + return false; } \ No newline at end of file diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index 122a91d..921dfb2 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -4,12 +4,15 @@ #include #include #include - +#include +#include "dealMsg.h" +#include "PQSMsg.h" // 测点信息结构 struct PointInfo { std::string point_id; // 测点ID std::string name; // 测点名称 std::string device_id; // 所属装置ID + ushort nCpuNo; //测点序号 1-6 double PT1; // 电压变比1 double PT2; // 电压变比2 double CT1; // 电流变比1 @@ -32,6 +35,21 @@ enum class ConnectionState { CONNECTED }; +// 添加的状态枚举 +enum class DeviceState { + IDLE, // 空闲状态 + READING_STATS, // 读取统计数据 + READING_STATS_TIME, // 读取统计时间 + // 可根据需要添加更多状态 + CUSTOM_ACTION // 自定义动作 +}; + +// 状态动作结构体 +struct StateAction { + DeviceState state; + std::vector packet; // 该状态需要发送的报文 +}; + class ClientContext { public: uv_loop_t* loop; @@ -41,18 +59,65 @@ public: ConnectionState state; int reconnect_attempts; volatile bool shutdown; - DeviceInfo device_info; // 装置信息 + + DeviceInfo device_info; // 装置信息 + int cloudstatus; // 云前置登录状态(0:未登录 1:已登录) + + // 新增状态管理成员 + DeviceState current_state_; // 当前装置状态 + uint64_t state_start_time_; // 状态开始时间(ms) + std::queue action_queue_; // 状态动作队列 + std::mutex state_mutex_; // 状态操作互斥锁 + std::vector current_packet_; // 当前状态需要发送的报文 ClientContext(uv_loop_t* loop, const DeviceInfo& device, int index); ~ClientContext(); - void init_tcp(); - void start_timer(); - void start_reconnect_timer(int delay); - void stop_timers(); - void close_handles(); - void append_and_process_data(const char* data, size_t len); - void put_packet_into_queue(const std::vector& packet); + void init_tcp();//初始化客户端连接 + void start_timer();//启动对应装置计时器 5秒执行一次 + void start_reconnect_timer(int delay);//启动客户端重连定时 + void stop_timers();//停止重连定时器 + void close_handles();//关闭客户端各类连接与定时器 + void append_and_process_data(const char* data, size_t len);//接收装置数据并存入缓冲 + void put_packet_into_queue(const std::vector& packet);//推送完整数据至处理队列 + + void change_state(DeviceState new_state, const std::vector& packet = {});//改变装置状态和当前状态的待发送报文 + void add_action(DeviceState state, const std::vector& packet = {});//添加后续动作 + void check_state_timeout();//装置状态超时检测 + void process_next_action();//装置取后续工作并执行 + void send_current_packet();//发送当前状态的报文至装置 + + // 新增: 多帧数据报文缓存 + struct StatPacket { + int packet_index; + std::vector data; + }; + + std::vector stat_packets_cache_; // 缓存分帧报文 + int expected_total_packets_ = 0; // 预期总帧数 + std::mutex stat_cache_mutex_; // 缓存互斥锁 + + // 新增缓存管理方法 + bool add_stat_packet(const std::vector& packet, int current_packet, int total_packets);//插入多帧缓存数据 + std::vector get_and_clear_stat_packets();//取出所有缓存数据并清空缓存 + void clear_stat_cache();//清空缓存 + + // 统计数据缓存 + struct PointFloatCache { + std::array data; // 存储四种数据类型(0-3) + std::array received = { false }; // 标记四种数据类型是否已接收 + }; + + // 测点统计浮点数据缓存映射表 (测点号 -> 缓存数据) + std::unordered_map point_float_cache_; + std::mutex float_cache_mutex_; // 浮点缓存互斥锁 + + // 添加浮点数据到缓存 + bool add_float_data(ushort point_id, int data_type, const tagPqData_Float& float_data); + // 获取并清除指定测点的完整浮点数据 + std::array get_and_clear_float_data(ushort point_id); + // 清除所有浮点数据缓存 + void clear_float_cache(); private: int index_; @@ -76,16 +141,69 @@ public: loop_ = loop; } - void add_device(const DeviceInfo& device); - void remove_device(const std::string& device_id); - bool send_to_device(const std::string& identifier, const unsigned char* data, size_t size); - void stop_all(); + void add_device(const DeviceInfo& device);//添加一个装置连接 + void remove_device(const std::string& device_id);//删除一个装置连接 + bool send_to_device(const std::string& identifier, const unsigned char* data, size_t size);//选择指定的装置发送消息至服务端 + void restart_device(const std::string& device_id);//关闭指定装置连接,等待重连唤起 + void stop_all();//停止所有客户端连接 + bool set_cloud_status(const std::string& identifier, int status);//修改云前置登录状态 + bool post_message_processing(const std::string& identifier);// 消息处理完成后触发状态处理 + // 添加状态动作到装置 + bool add_action_to_device(const std::string& identifier, + DeviceState state, + const std::vector& packet = {}); + + // 改变装置当前状态 + bool change_device_state(const std::string& identifier, + DeviceState new_state, + const std::vector& packet = {}); + + // 清除装置动作队列 + bool clear_action_queue(const std::string& identifier); + + // 获取装置当前状态 + bool get_device_state(const std::string& identifier, DeviceState& out_state); + + // 新增:通过标识符获取装置测点信息 + bool get_device_points(const std::string& identifier,std::vector& out_points); + + //接收指定客户端的多帧报文并存入缓存区 + bool add_stat_packet_to_device(const std::string& identifier, + const std::vector& packet, + int current_packet, + int total_packets); + + //获取指定客户端的所有缓存报文并清空缓存区 + std::vector get_and_clear_stat_packets(const std::string& identifier); + + //清空多帧报文保存缓存区 + bool clear_stat_cache(const std::string& identifier); + + // 获取指定测点的PT和CT变比值 + bool get_pt_ct_ratio(const std::string& identifier, + int16_t nCpuNo, + float& pt_ratio, + float& ct_ratio); + // 获取客户端数量 size_t client_count() { std::lock_guard lock(mutex_); return clients_.size(); } + // 添加浮点数据到指定设备的缓存 + bool add_float_data_to_device(const std::string& identifier, + ushort point_id, + int data_type, + const tagPqData_Float& float_data); + + // 获取并清除指定测点的完整浮点数据 + std::array get_and_clear_float_data( + const std::string& identifier, ushort point_id); + + // 清除设备的所有浮点缓存 + bool clear_float_cache(const std::string& identifier); + private: ClientManager() : loop_(nullptr) {} std::unordered_map> clients_; diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 955e75f..56c145d 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -7,9 +7,7 @@ #include #include #include -#include "PQSMsg.h" #include "client2.h" -#include "dealMsg.h" #include using namespace std; @@ -22,7 +20,226 @@ void process_received_message(string mac, string id,const char* data, size_t len // 示例:解析消息并处理 // 注意:根据您的协议实现具体的解析逻辑 + //数据处理逻辑 + if (length > 0) { + // 将数据转为无符号类型以便处理二进制值 + const unsigned char* udata = reinterpret_cast(data); + //对数据消息的初步处理--登录报文格式解析不出来 + MessageParser parser; + bool bool_msgset = parser.SetMsg(udata, length); + //云服务登录报文 + if (udata[0] == 0xEB && udata[1] == 0x90 && udata[2] == 0xEB && udata[3] == 0x90) { + //通讯状态报文 + if (udata[8] == 0x01) { + std::cout << "cloud login: " << mac << " state: " << static_cast(udata[16]) << static_cast(udata[17]) << static_cast(udata[18]) << static_cast(udata[19]) << std::endl; + if (udata[19] == 0x10) { + std::cout << "cloud login: " << mac << " state: success!" << std::endl; + //装置登录成功 + ClientManager::instance().set_cloud_status(id, 1); //设置了云前置登录状态为已登录 + } + if (udata[19] == 0x00) { + std::cout << "cloud login: " << mac << " state: fail!" << std::endl; + //装置登录失败 关闭客户端连接 等待20秒重新登录 + ClientManager::instance().restart_device(id); + } + } + else { + std::cout << "cloud login: " << mac << " state: error!"<< std::endl; + //装置登录失败 关闭客户端连接 等待20秒重新登录 + ClientManager::instance().restart_device(id); + } + //登录报文处理完毕,当前报文处理逻辑结束并返回 + return; + } + + //常规通讯报文 + { + DeviceState currentState = DeviceState::IDLE;//获取当前装置的状态 + if (!ClientManager::instance().get_device_state(id, currentState)) { + std::cerr << "Failed to get device state for: " << id << std::endl; + return; + } + // 根据装置状态处理报文 + switch (currentState) { + case DeviceState::IDLE: + // 空闲状态下收到报文,可能是主动上报数据 + std::cout << "IDLE state: Received active report from " << mac << std::endl; + // 这里可以添加处理主动上报数据的逻辑 + break; + + case DeviceState::READING_STATS: + // 读取统计数据状态 + std::cout << "READING_STATS state: Processing stats data from " << mac << std::endl; + // 这里添加处理统计数据报文的逻辑 + if (udata[8] == static_cast(MsgResponseType::Response_Stat)) { + // 一发多收,需要在这里等待所有报文收全再组装相应数据 一帧1K 直到所有数据传送完毕 + //当前帧未收全,直接退出消息处理,等待后续帧 + std::cout << "mac: " << mac << " count" << static_cast(udata[10]) << std::endl; + + // 解析帧信息 (根据实际协议调整) + int current_packet = static_cast(udata[10]); // 当前帧序号 + int total_packets = Stat_PacketNum; // 总帧数 + std::vector packet_data(udata, udata + length); + bool complete = ClientManager::instance().add_stat_packet_to_device( + id, packet_data, current_packet, total_packets + ); + //判断是否收全 + if (complete) { + // 1. 获取并清空缓存数据包 + auto packets = ClientManager::instance().get_and_clear_stat_packets(id); + + // 2. 按帧序号排序 + std::sort(packets.begin(), packets.end(), + [](const ClientContext::StatPacket& a, const ClientContext::StatPacket& b) { + return a.packet_index < b.packet_index; + }); + + // 3. 解析每帧数据并提取数据体 + std::vector full_data; + MessageParser parser; + + for (const auto& packet : packets) { + // 解析单帧报文 + if (!parser.SetMsg(packet.data.data(), packet.data.size())) { + std::cerr << "Failed to parse packet " << packet.packet_index + << " for device " << id << std::endl; + continue; + } + + // 将数据体添加到完整序列 + full_data.insert(full_data.end(), + parser.RecvData.begin(), + parser.RecvData.end()); + } + + // 4. 组装 tagPqData 对象 + tagPqData pq_data; + if (!pq_data.SetStructBuf(full_data.data(), full_data.size())) { + std::cerr << "Failed to assemble tagPqData for device " << id << std::endl; + } + else { + // 成功组装,可以在这里使用 pq_data 对象 + std::cout << "Successfully assembled tagPqData for device: " + << id << std::endl; + + float fPT = 1.0f; + float fCT = 1.0f; + if (ClientManager::instance().get_pt_ct_ratio(id, pq_data.name, fPT, fCT)) { + // 使用获取的变比值进行数据转换 + tagPqData_Float float_data; + float_data.SetFloatValue(pq_data, fPT, fCT); + float_data.name = pq_data.name; + float_data.Data_Type = pq_data.Data_Type; + + // 将浮点数据添加到缓存 + // 添加到缓存并检查是否收全 + bool complete = ClientManager::instance().add_float_data_to_device( + id, pq_data.name, pq_data.Data_Type, float_data); + + if (complete) { + // 如果收全,立即取出处理 + std::array all_data = + ClientManager::instance().get_and_clear_float_data(id, pq_data.name); + + if (!all_data.empty()) { + //单个测点 4组数据处理逻辑 + tagPqData_Float max_data = all_data[0]; + tagPqData_Float min_data = all_data[1]; + tagPqData_Float avg_data = all_data[2]; + tagPqData_Float cp95_data = all_data[3]; + + // 转换为Base64字符串 + std::string base64Str = max_data.ConvertToBase64(); + + // 输出结果 + std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount() + << " floats): " << base64Str << std::endl; + } + } + + } + else { + // 处理获取变比值失败的情况 + std::cerr << "Failed to get PT/CT ratio for device: " + << mac << " lineno: " << pq_data.name << std::endl; + } + } + //数据组装完毕,修改为空闲状态等待下一项工作 + ClientManager::instance().change_device_state(id, DeviceState::IDLE); + } + else { + //未收全则直接结束处理,等待后续报文应答 + return; + } + } + else { + // 装置答非所问异常 + // 接收统计数据错误,调整为空闲状态,处理下一项工作。 + ClientManager::instance().change_device_state(id, DeviceState::IDLE); + } + break; + + case DeviceState::READING_STATS_TIME: + // 读取统计时间状态 + std::cout << "READING_STATS_TIME state: Processing stats time from " << mac << std::endl; + if (udata[8] == static_cast(MsgResponseType::Response_StatTime)) { + std::vector points;//装置测点信息 + if (ClientManager::instance().get_device_points(mac, points)) { + // 成功获取测点信息 + // 处理接收装置的时标 + tagTime t3; + t3.SetStructBuf(parser.RecvData.data(), parser.RecvData.size()); + int first = 0;//第一次标记 + for (const auto& point : points) { + for (ushort i = 0; i < 4; i++)//每个测点需要单独召唤最大,最小,平均,95概率值 + { + auto sendbuff = generate_statequerystat_message(t3, point.nCpuNo, i);//组装询问统计数据报文 + if (first == 0) { + //首次尝试组装报文 直接将当前状态调整 并等待最后启动发送 + first++; + ClientManager::instance().change_device_state(id, DeviceState::READING_STATS, sendbuff); + } + else { + //非首次进入,将动作传入队列等待 + ClientManager::instance().add_action_to_device(id, DeviceState::READING_STATS, sendbuff); + } + } + } + } + else { + // 未找到装置下属测点异常 + // 接收统计数据时间错误,调整为空闲状态,处理下一项工作。 + ClientManager::instance().change_device_state(id, DeviceState::IDLE); + } + } + else { + // 装置答非所问异常 + // 接收统计数据时间错误,调整为空闲状态,处理下一项工作。 + ClientManager::instance().change_device_state(id, DeviceState::IDLE); + } + break; + + case DeviceState::CUSTOM_ACTION: + // 自定义动作状态 + std::cout << "CUSTOM_ACTION state: Processing custom response from " << mac << std::endl; + // 这里添加处理自定义动作响应的逻辑 + + // 处理完成后标记状态完成 + ClientManager::instance().change_device_state(id, DeviceState::IDLE); + break; + + default: + std::cerr << "Unknown state: " << static_cast(currentState) + << " for device " << id << std::endl; + break; + } + + // 无论何种状态,处理完成后触发后续状态处理 + ClientManager::instance().post_message_processing(id); + } + + } } diff --git a/LFtid1056/dealMsg.h b/LFtid1056/dealMsg.h index 627f728..6d072d5 100644 --- a/LFtid1056/dealMsg.h +++ b/LFtid1056/dealMsg.h @@ -9,6 +9,9 @@ #include #include using namespace std; +//前置声明 +#pragma once +class PointInfo; /* 常量定义 */ #define MESSAGE_QUEUE_SIZE 10000 // 消息队列容量 diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 732b7e2..8f44d38 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -4,9 +4,7 @@ #include #include #include -#include "PQSMsg.h" #include "client2.h" -#include "dealMsg.h" #include "cloudfront/code/interface.h" @@ -52,6 +50,7 @@ std::vector generate_test_devices(int count) { "P" + dev_id.substr(1) + "01", // 测点ID如 P00101 "Voltage " + dev_name, dev_id, + 1, 0.0, // 随机电压值 0.0, 100.0, @@ -61,6 +60,7 @@ std::vector generate_test_devices(int count) { "P" + dev_id.substr(1) + "02", // 测点ID如 P00102 "Current " + dev_name, dev_id, + 2, 0.0, // 随机电流值 0.0, 20.0, @@ -82,37 +82,6 @@ std::vector generate_test_devices(int count) { return devices; } -/* 线程工作函数 待分配线程池*/ -void* work_thread(void* arg) { - int index = *(int*)arg; // 获取线程索引 - free(arg); // 释放动态分配的索引内存 - - // 更新线程状态为运行中 - pthread_mutex_lock(&thread_info[index].lock); - printf("Thread %d started\n", index); - thread_info[index].state = THREAD_RUNNING; - pthread_mutex_unlock(&thread_info[index].lock); - - // 模拟工作循环(5秒间隔) - while (1) { - sleep(5); - - // 10%概率模拟线程故障 - if (rand() % 10 == 0) { - pthread_mutex_lock(&thread_info[index].lock); - printf("Thread %d simulated failure\n", index); - pthread_mutex_unlock(&thread_info[index].lock); - break; - } - } - - // 线程终止处理 - pthread_mutex_lock(&thread_info[index].lock); - thread_info[index].state = THREAD_STOPPED; - printf("Thread %d stopped\n", index); - pthread_mutex_unlock(&thread_info[index].lock); - return NULL; -} /* 线程工作函数 0号子线程*/ /* 客户端连接管理线程函数*/ void* client_manager_thread(void* arg) { @@ -129,14 +98,14 @@ void* client_manager_thread(void* arg) { // 创建测点数据 std::vector points1 = { - {"P001", "Main Voltage", "D001", 10.0, 0.0, 100.0, 0.0}, - {"P002", "Backup Voltage", "D001", 5.0, 0.0, 50.0, 0.0} + {"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1}, + {"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1} }; std::vector points2 = { - {"P101", "Generator Output", "D002", 20.0, 0.0, 200.0, 0.0} + {"P101", "Generator Output", "D002",1 ,1, 1, 1, 1} }; - + //00-B7-8D-A8-00-D6 // 创建装置列表 std::vector devices = { { @@ -144,7 +113,7 @@ void* client_manager_thread(void* arg) { 1, points1 }, { - "D002", "Backup Device", "Model-Y", "00-B7-8D-A8-00-D6", + "D002", "Backup Device", "Model-Y", "00-B7-8D-01-79-06", 1, points2 } };