Files
front_linux/LFtid1056/dealMsg.cpp

246 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <queue>
#include <vector>
#include <atomic>
#include "client2.h"
#include <iostream>
using namespace std;
SafeMessageQueue message_queue; // 全局消息队列
void process_received_message(string mac, string id,const char* data, size_t length) {
// 实际的消息处理逻辑
// 这里可以添加您的业务处理代码
std::cout << "Active connections: " << mac << " id:" << id << " size:" << length << std::endl;
// 示例:解析消息并处理
// 注意:根据您的协议实现具体的解析逻辑
//数据处理逻辑
if (length > 0) {
// 将数据转为无符号类型以便处理二进制值
const unsigned char* udata = reinterpret_cast<const unsigned char*>(data);
//对数据消息的初步处理--登录报文格式解析不出来
MessageParser parser;
bool bool_msgset = parser.SetMsg(udata, length);
//云服务登录报文
if (udata[0] == 0xEB && udata[1] == 0x90 && udata[2] == 0xEB && udata[3] == 0x90) {
//通讯状态报文
if (udata[8] == 0x01) {
std::cout << "cloud login: " << mac << " state: " << static_cast<int>(udata[16]) << static_cast<int>(udata[17]) << static_cast<int>(udata[18]) << static_cast<int>(udata[19]) << std::endl;
if (udata[19] == 0x10) {
std::cout << "cloud login: " << mac << " state: success!" << std::endl;
//装置登录成功
ClientManager::instance().set_cloud_status(id, 1); //设置了云前置登录状态为已登录
}
if (udata[19] == 0x00) {
std::cout << "cloud login: " << mac << " state: fail!" << std::endl;
//装置登录失败 关闭客户端连接 等待20秒重新登录
ClientManager::instance().restart_device(id);
}
}
else {
std::cout << "cloud login: " << mac << " state: error!"<< std::endl;
//装置登录失败 关闭客户端连接 等待20秒重新登录
ClientManager::instance().restart_device(id);
}
//登录报文处理完毕,当前报文处理逻辑结束并返回
return;
}
//常规通讯报文
{
DeviceState currentState = DeviceState::IDLE;//获取当前装置的状态
if (!ClientManager::instance().get_device_state(id, currentState)) {
std::cerr << "Failed to get device state for: " << id << std::endl;
return;
}
// 根据装置状态处理报文
switch (currentState) {
case DeviceState::IDLE:
// 空闲状态下收到报文,可能是主动上报数据
std::cout << "IDLE state: Received active report from " << mac << std::endl;
// 这里可以添加处理主动上报数据的逻辑
break;
case DeviceState::READING_STATS:
// 读取统计数据状态
std::cout << "READING_STATS state: Processing stats data from " << mac << std::endl;
// 这里添加处理统计数据报文的逻辑
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_Stat)) {
// 一发多收,需要在这里等待所有报文收全再组装相应数据 一帧1K 直到所有数据传送完毕
//当前帧未收全,直接退出消息处理,等待后续帧
std::cout << "mac: " << mac << " count" << static_cast<int>(udata[10]) << std::endl;
// 解析帧信息 (根据实际协议调整)
int current_packet = static_cast<int>(udata[10]); // 当前帧序号
int total_packets = Stat_PacketNum; // 总帧数
std::vector<unsigned char> packet_data(udata, udata + length);
bool complete = ClientManager::instance().add_stat_packet_to_device(
id, packet_data, current_packet, total_packets
);
//判断是否收全
if (complete) {
// 1. 获取并清空缓存数据包
auto packets = ClientManager::instance().get_and_clear_stat_packets(id);
// 2. 按帧序号排序
std::sort(packets.begin(), packets.end(),
[](const ClientContext::StatPacket& a, const ClientContext::StatPacket& b) {
return a.packet_index < b.packet_index;
});
// 3. 解析每帧数据并提取数据体
std::vector<unsigned char> full_data;
MessageParser parser;
for (const auto& packet : packets) {
// 解析单帧报文
if (!parser.SetMsg(packet.data.data(), packet.data.size())) {
std::cerr << "Failed to parse packet " << packet.packet_index
<< " for device " << id << std::endl;
continue;
}
// 将数据体添加到完整序列
full_data.insert(full_data.end(),
parser.RecvData.begin(),
parser.RecvData.end());
}
// 4. 组装 tagPqData 对象
tagPqData pq_data;
if (!pq_data.SetStructBuf(full_data.data(), full_data.size())) {
std::cerr << "Failed to assemble tagPqData for device " << id << std::endl;
}
else {
// 成功组装,可以在这里使用 pq_data 对象
std::cout << "Successfully assembled tagPqData for device: "
<< id << std::endl;
float fPT = 1.0f;
float fCT = 1.0f;
if (ClientManager::instance().get_pt_ct_ratio(id, pq_data.name, fPT, fCT)) {
// 使用获取的变比值进行数据转换
tagPqData_Float float_data;
float_data.SetFloatValue(pq_data, fPT, fCT);
float_data.name = pq_data.name;
float_data.Data_Type = pq_data.Data_Type;
// 将浮点数据添加到缓存
// 添加到缓存并检查是否收全
bool complete = ClientManager::instance().add_float_data_to_device(
id, pq_data.name, pq_data.Data_Type, float_data);
if (complete) {
// 如果收全,立即取出处理
std::array<tagPqData_Float, 4> all_data =
ClientManager::instance().get_and_clear_float_data(id, pq_data.name);
if (!all_data.empty()) {
//单个测点 4组数据处理逻辑
tagPqData_Float max_data = all_data[0];
tagPqData_Float min_data = all_data[1];
tagPqData_Float avg_data = all_data[2];
tagPqData_Float cp95_data = all_data[3];
// 转换为Base64字符串
std::string base64Str = max_data.ConvertToBase64();
// 输出结果
std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount()
<< " floats): " << base64Str << std::endl;
}
}
}
else {
// 处理获取变比值失败的情况
std::cerr << "Failed to get PT/CT ratio for device: "
<< mac << " lineno: " << pq_data.name << std::endl;
}
}
//数据组装完毕,修改为空闲状态等待下一项工作
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
else {
//未收全则直接结束处理,等待后续报文应答
return;
}
}
else {
// 装置答非所问异常
// 接收统计数据错误,调整为空闲状态,处理下一项工作。
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
break;
case DeviceState::READING_STATS_TIME:
// 读取统计时间状态
std::cout << "READING_STATS_TIME state: Processing stats time from " << mac << std::endl;
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_StatTime)) {
std::vector<PointInfo> points;//装置测点信息
if (ClientManager::instance().get_device_points(mac, points)) {
// 成功获取测点信息
// 处理接收装置的时标
tagTime t3;
t3.SetStructBuf(parser.RecvData.data(), parser.RecvData.size());
int first = 0;//第一次标记
for (const auto& point : points) {
for (ushort i = 0; i < 4; i++)//每个测点需要单独召唤最大最小平均95概率值
{
auto sendbuff = generate_statequerystat_message(t3, point.nCpuNo, i);//组装询问统计数据报文
if (first == 0) {
//首次尝试组装报文 直接将当前状态调整 并等待最后启动发送
first++;
ClientManager::instance().change_device_state(id, DeviceState::READING_STATS, sendbuff);
}
else {
//非首次进入,将动作传入队列等待
ClientManager::instance().add_action_to_device(id, DeviceState::READING_STATS, sendbuff);
}
}
}
}
else {
// 未找到装置下属测点异常
// 接收统计数据时间错误,调整为空闲状态,处理下一项工作。
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
}
else {
// 装置答非所问异常
// 接收统计数据时间错误,调整为空闲状态,处理下一项工作。
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
break;
case DeviceState::CUSTOM_ACTION:
// 自定义动作状态
std::cout << "CUSTOM_ACTION state: Processing custom response from " << mac << std::endl;
// 这里添加处理自定义动作响应的逻辑
// 处理完成后标记状态完成
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
break;
default:
std::cerr << "Unknown state: " << static_cast<int>(currentState)
<< " for device " << id << std::endl;
break;
}
// 无论何种状态,处理完成后触发后续状态处理
ClientManager::instance().post_message_processing(id);
}
}
}