#include #include #include #include #include #include #include #include #include #include #include "cloudfront/code/interface.h" //lnk20250708 #include "cloudfront/code/rocketmq.h" //lnk20250708 #include "client2.h" using namespace std; SafeMessageQueue message_queue; // 全局消息队列 //时间转换函数 time_t ConvertToTimestamp(const tagPqData_Float& data) { struct tm t = {}; t.tm_year = data.time.DeviceYear - 1900; // tm_year 从 1900 开始计 t.tm_mon = data.time.DeviceMonth - 1; // tm_mon 从 0(1月)开始 t.tm_mday = data.time.DeviceDay; t.tm_hour = data.time.DeviceHour; t.tm_min = data.time.DeviceMinute; t.tm_sec = data.time.DeviceSecond; // 返回时间戳(本地时间) return mktime(&t); } void process_received_message(string mac, string id,const char* data, size_t length) { // 实际的消息处理逻辑 // 这里可以添加您的业务处理代码 std::cout << "Active connections: " << mac << " id:" << id << " size:" << length << std::endl; // 示例:解析消息并处理 // 注意:根据您的协议实现具体的解析逻辑 //数据处理逻辑 if (length > 0) { // 将数据转为无符号类型以便处理二进制值 const unsigned char* udata = reinterpret_cast(data); //对数据消息的初步处理--登录报文格式解析不出来 MessageParser parser; bool bool_msgset = parser.SetMsg(udata, length); //云服务登录报文 if (udata[0] == 0xEB && udata[1] == 0x90 && udata[2] == 0xEB && udata[3] == 0x90) { //通讯状态报文 if (udata[8] == 0x01) { std::cout << "cloud login: " << mac << " state: " << static_cast(udata[16]) << static_cast(udata[17]) << static_cast(udata[18]) << static_cast(udata[19]) << std::endl; if (udata[19] == 0x10) { std::cout << "cloud login: " << mac << " state: success!" << std::endl; //装置登录成功 ClientManager::instance().set_cloud_status(id, 1); //设置了云前置登录状态为已登录 ClientManager::instance().set_real_state_count("D002", 1,1);//登录后测试实时 } if (udata[19] == 0x00) { std::cout << "cloud login: " << mac << " state: fail!" << std::endl; //装置登录失败 关闭客户端连接 等待20秒重新登录 ClientManager::instance().restart_device(id); } } else { std::cout << "cloud login: " << mac << " state: error!"<< std::endl; //装置登录失败 关闭客户端连接 等待20秒重新登录 ClientManager::instance().restart_device(id); } //登录报文处理完毕,当前报文处理逻辑结束并返回 return; } //常规通讯报文 { DeviceState currentState = DeviceState::IDLE;//获取当前装置的状态 if (!ClientManager::instance().get_device_state(id, currentState)) { std::cerr << "Failed to get device state for: " << id << std::endl; return; } // 根据装置状态处理报文 switch (currentState) { case DeviceState::IDLE: // 空闲状态下收到报文,可能是主动上报数据 std::cout << "IDLE state: Received active report from " << mac << std::endl; // 这里可以添加处理主动上报数据的逻辑 break; case DeviceState::READING_STATS: // 读取统计数据状态 std::cout << "READING_STATS state: Processing stats data from " << mac << std::endl; // 这里添加处理统计数据报文的逻辑 if (udata[8] == static_cast(MsgResponseType::Response_Stat)) { // 一发多收,需要在这里等待所有报文收全再组装相应数据 一帧1K 直到所有数据传送完毕 //当前帧未收全,直接退出消息处理,等待后续帧 std::cout << "mac: " << mac << " count" << static_cast(udata[10]) << std::endl; // 解析帧信息 (根据实际协议调整) int current_packet = static_cast(udata[10]); // 当前帧序号 int total_packets = Stat_PacketNum; // 总帧数 std::vector packet_data(udata, udata + length); bool complete = ClientManager::instance().add_stat_packet_to_device( id, packet_data, current_packet, total_packets ); //判断是否收全 if (complete) { // 1. 获取并清空缓存数据包 auto packets = ClientManager::instance().get_and_clear_stat_packets(id); // 2. 按帧序号排序 std::sort(packets.begin(), packets.end(), [](const ClientContext::StatPacket& a, const ClientContext::StatPacket& b) { return a.packet_index < b.packet_index; }); // 3. 解析每帧数据并提取数据体 std::vector full_data; MessageParser parser; for (const auto& packet : packets) { // 解析单帧报文 if (!parser.SetMsg(packet.data.data(), packet.data.size())) { std::cerr << "Failed to parse packet " << packet.packet_index << " for device " << id << std::endl; continue; } // 将数据体添加到完整序列 full_data.insert(full_data.end(), parser.RecvData.begin(), parser.RecvData.end()); } // 4. 组装 tagPqData 对象 tagPqData pq_data; if (!pq_data.SetStructBuf(full_data.data(), full_data.size())) { std::cerr << "Failed to assemble tagPqData for device " << id << std::endl; } else { // 成功组装,可以在这里使用 pq_data 对象 std::cout << "Successfully assembled tagPqData for device: " << id << std::endl; float fPT = 1.0f; float fCT = 1.0f; if (ClientManager::instance().get_pt_ct_ratio(id, pq_data.name, fPT, fCT)) { // 使用获取的变比值进行数据转换 tagPqData_Float float_data; float_data.SetFloatValue(pq_data, fPT, fCT); float_data.name = pq_data.name; float_data.Data_Type = pq_data.Data_Type; // 将浮点数据添加到缓存 // 添加到缓存并检查是否收全 bool complete = ClientManager::instance().add_float_data_to_device( id, pq_data.name, pq_data.Data_Type, float_data); if (complete) { // 如果收全,立即取出处理 std::array all_data = ClientManager::instance().get_and_clear_float_data(id, pq_data.name); if (!all_data.empty()) { //单个测点 4组数据处理逻辑 tagPqData_Float max_data = all_data[0]; tagPqData_Float min_data = all_data[1]; tagPqData_Float avg_data = all_data[2]; tagPqData_Float cp95_data = all_data[3]; // 转换为Base64字符串 std::string max_base64Str = max_data.ConvertToBase64(); std::string min_base64Str = min_data.ConvertToBase64(); std::string avg_base64Str = avg_data.ConvertToBase64(); std::string cp95_base64Str = cp95_data.ConvertToBase64(); //lnk20250708使用接口发送 time_t data_time = ConvertToTimestamp(avg_data); std::vector arr; arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1” -1, //数据时标,微秒钟,无效填入“-1” 0, //数据标识,1-标识数据异常 max_base64Str}); arr.push_back({2, data_time, -1, 0, min_base64Str}); arr.push_back({3, data_time, -1, 0, avg_base64Str}); arr.push_back({4, data_time, -1, 0, cp95_base64Str}); std::string js = generate_json( -1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” 123456, //设备唯一标识Ldid,填入0代表Ndid 1, //报文处理的优先级 0x1302, //设备数据主动上送的数据类型 max_data.name, //逻辑子设备ID,0-逻辑设备本身,无填-1 max_data.Data_Type, //数据类型 2, //数据属性:无“0”、实时“1”、统计“2”等 1, //数据集序号(以数据集方式上送),无填-1 arr //数据数组 ); //std::cout << js << std::endl; queue_data_t data; data.monitor_no = 1; data.strTopic = TOPIC_STAT; data.strText = js; data.mp_id = "test"; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); // 输出结果 //std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount() // << " floats): " << base64Str << std::endl; } } } else { // 处理获取变比值失败的情况 std::cerr << "Failed to get PT/CT ratio for device: " << mac << " lineno: " << pq_data.name << std::endl; } } //数据组装完毕,修改为空闲状态等待下一项工作 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else { //未收全则直接结束处理,等待后续报文应答 return; } } else { // 装置答非所问异常 // 接收统计数据错误,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } break; case DeviceState::READING_STATS_TIME: // 读取统计时间状态 std::cout << "READING_STATS_TIME state: Processing stats time from " << mac << std::endl; if (udata[8] == static_cast(MsgResponseType::Response_StatTime)) { std::vector points;//装置测点信息 if (ClientManager::instance().get_device_points(mac, points)) { // 成功获取测点信息 // 处理接收装置的时标 tagTime t3; t3.SetStructBuf(parser.RecvData.data(), parser.RecvData.size()); int first = 0;//第一次标记 for (const auto& point : points) { for (ushort i = 0; i < 4; i++)//每个测点需要单独召唤最大,最小,平均,95概率值 { auto sendbuff = generate_statequerystat_message(t3, point.nCpuNo, i);//组装询问统计数据报文 if (first == 0) { //首次尝试组装报文 直接将当前状态调整 并等待最后启动发送 first++; ClientManager::instance().change_device_state(id, DeviceState::READING_STATS, sendbuff); } else { //非首次进入,将动作传入队列等待 ClientManager::instance().add_action_to_device(id, DeviceState::READING_STATS, sendbuff); } } } } else { // 未找到装置下属测点异常 // 接收统计数据时间错误,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } } else { // 装置答非所问异常 // 接收统计数据时间错误,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } break; case DeviceState::READING_REALSTAT: //读取实时数据状态 std::cout << "READING_REALSTAT state: Processing stats data from " << mac << std::endl; if (udata[8] == static_cast(MsgResponseType::Response_New_3S)) { unsigned char packet_type = udata[13]; // 将数据添加到缓存 const uint8_t* data_ptr = parser.RecvData.data() + 4; size_t data_size = parser.RecvData.size() - 4; ClientManager::instance().add_realtime_packet_to_device( id, packet_type, data_ptr, data_size ); // 如果不是最后一个包,请求下一个包 if (packet_type != 0x06) { unsigned char next_packet_type = packet_type + 1; auto sendbuff = generate_realstat_message( static_cast(udata[12]), next_packet_type, static_cast(0x01) ); ClientManager::instance().change_device_state( id, DeviceState::READING_REALSTAT, sendbuff ); } else { // 获取并清空缓存 auto packets = ClientManager::instance().get_and_clear_realtime_packets(id); // 按包类型排序(01-06) std::sort(packets.begin(), packets.end(), [](const ClientContext::RealtimePacket& a, const ClientContext::RealtimePacket& b) { return a.packet_type < b.packet_type; }); RealtagPqDate_float realdata; // 按顺序解析每个包 for (const auto& packet : packets) { switch (packet.packet_type) { case 0x01: realdata.ParsePacket1(packet.data.data(), packet.data.size()); break; case 0x02: realdata.ParsePacket2(packet.data.data(), packet.data.size()); break; case 0x03: realdata.ParsePacket3(packet.data.data(), packet.data.size()); break; case 0x04: realdata.ParsePacket4(packet.data.data(), packet.data.size()); break; case 0x05: realdata.ParsePacket5(packet.data.data(), packet.data.size()); break; case 0x06: realdata.ParsePacket6(packet.data.data(), packet.data.size()); break; } } std::string base64 = realdata.ConvertToBase64(); std::cout << base64 << std::endl; // 处理完成后重置状态 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } } else { // 装置答非所问异常 // 接收实时数据错误,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } break; case DeviceState::CUSTOM_ACTION: // 自定义动作状态 std::cout << "CUSTOM_ACTION state: Processing custom response from " << mac << std::endl; // 这里添加处理自定义动作响应的逻辑 // 处理完成后标记状态完成 ClientManager::instance().change_device_state(id, DeviceState::IDLE); break; default: std::cerr << "Unknown state: " << static_cast(currentState) << " for device " << id << std::endl; break; } // 无论何种状态,处理完成后触发后续状态处理 ClientManager::instance().post_message_processing(id); } } }