Files
front_linux/LFtid1056/dealMsg.cpp
2025-07-25 10:49:34 +08:00

655 lines
32 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <queue>
#include <vector>
#include <atomic>
#include <fstream>
#include <sys/stat.h> // 用于mkdir
#include <iostream>
#include "cloudfront/code/interface.h" //lnk20250708
#include "cloudfront/code/rocketmq.h" //lnk20250708
#include "client2.h"
using namespace std;
SafeMessageQueue message_queue; // 全局消息队列
//时间转换函数
time_t ConvertToTimestamp(const tagTime& time) {
struct tm t = {};
t.tm_year = time.DeviceYear - 1900; // tm_year 从 1900 开始计
t.tm_mon = time.DeviceMonth - 1; // tm_mon 从 01月开始
t.tm_mday = time.DeviceDay;
t.tm_hour = time.DeviceHour;
t.tm_min = time.DeviceMinute;
t.tm_sec = time.DeviceSecond;
// 返回时间戳(本地时间)
return mktime(&t);
}
//文件分割取字段
std::string extract_filename(const std::string& path) {
// 查找最后一个'/'的位置
size_t last_slash = path.find_last_of('/');
// 如果找到'/',则返回'/'之后的部分
if (last_slash != std::string::npos) {
return path.substr(last_slash + 1);
}
// 如果没有'/',直接返回原字符串
return path;
}
void process_received_message(string mac, string id,const char* data, size_t length) {
// 实际的消息处理逻辑
std::cout << "Active connections: " << mac << " id:" << id << " size:" << length << std::endl;
// 示例:解析消息并处理
//数据处理逻辑
if (length > 0) {
// 将数据转为无符号类型以便处理二进制值
const unsigned char* udata = reinterpret_cast<const unsigned char*>(data);
//对数据消息的初步处理--登录报文格式解析不出来
MessageParser parser;
bool bool_msgset = parser.SetMsg(udata, length);
//云服务登录报文
if (udata[0] == 0xEB && udata[1] == 0x90 && udata[2] == 0xEB && udata[3] == 0x90) {
//通讯状态报文
if (udata[8] == 0x01) {
std::cout << "cloud login: " << mac << " state: " << static_cast<int>(udata[16]) << static_cast<int>(udata[17]) << static_cast<int>(udata[18]) << static_cast<int>(udata[19]) << std::endl;
if (udata[19] == 0x10) {
std::cout << "cloud login: " << mac << " state: success!" << std::endl;
//装置登录成功
ClientManager::instance().set_cloud_status(id, 1); //设置了云前置登录状态为已登录
ClientManager::instance().set_real_state_count("D002", 1,1);//登录后测试实时
}
if (udata[19] == 0x00) {
std::cout << "cloud login: " << mac << " state: fail!" << std::endl;
//装置登录失败 关闭客户端连接 等待20秒重新登录
ClientManager::instance().restart_device(id);
}
}
else {
std::cout << "cloud login: " << mac << " state: error!"<< std::endl;
//装置登录失败 关闭客户端连接 等待20秒重新登录
ClientManager::instance().restart_device(id);
}
//登录报文处理完毕,当前报文处理逻辑结束并返回
return;
}
//装置主动上送报文 暂态事件报文/暂态波形文件报文
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_Event)) {
//处理主动上送的暂态事件报文
NewTaglogbuffer event = NewTaglogbuffer::createFromData(parser.RecvData.data(), parser.RecvData.size());
// 获取测点参数
std::string strScale;//电压等级
int nPTType;//接线方式
float fPT = 1.0f;
float fCT = 1.0f;
if (ClientManager::instance().get_point_scale_and_pttype(
id, // 或使用id
event.head.name, // 从报文中解析出的测点序号
strScale,
nPTType) && ClientManager::instance().get_pt_ct_ratio(id, event.head.name, fPT, fCT))
{
// 使用获取的参数解析事件记录
QVVRRecord record = DynamicLog_GetQVVRRecordFromLogBuffer(
strScale, nPTType, fPT, event);
// 使用记录数据(示例:打印到控制台)
std::cout << "事件类型: " << record.nType
<< ", 持续时间: " << record.fPersisstime << "s"
<< ", 特征幅值: " << record.fMagntitude << " pu"
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
}
else {
// 处理获取失败的情况
std::cerr << "Failed to get point parameters for: " << mac << std::endl;
}
//处理完毕主动上送报文后直接退出,不要干扰装置正常应答
return;
}
else if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_ActiveSOEInfo)) {
//处理主动上送的波形文件信息报文
unsigned char file_type = udata[12];//录波文件类型数 cfg dat hdr 1-3
unsigned char line_id = udata[13];//录波测点 1-6
const uint8_t* data_ptr = parser.RecvData.data() + 2;//数据体去除前两位
size_t data_size = parser.RecvData.size() - 2;
// 直接构造字符串(避免额外拷贝)
std::string tempfilename(
reinterpret_cast<const char*>(data_ptr),
data_size
);
// ========== 新增文件路径处理逻辑 ==========
// 1. 分割原始文件名和后缀
size_t dotPos = tempfilename.find_last_of('.');
std::string baseName, originalExt;
if (dotPos != std::string::npos) {
baseName = tempfilename.substr(0, dotPos);
originalExt = tempfilename.substr(dotPos);
}
else {
baseName = tempfilename;
originalExt = "";
}
// 2. 确定大小写风格
bool isUppercase = false;
if (!originalExt.empty()) {
isUppercase = true;
for (char c : originalExt) {
if (std::isalpha(c) && std::islower(c)) {
isUppercase = false;
break;
}
}
}
// 3. 生成需要的后缀列表
std::vector<std::string> requiredExts;
if (file_type == 3) { // 需要三个文件
requiredExts = { ".cfg", ".dat", ".hdr" };
}
else { // 默认需要两个文件
requiredExts = { ".cfg", ".dat" };
//requiredExts = { ".dat" };
}
// 4. 调整后缀大小写
if (isUppercase) {
for (auto& ext : requiredExts) {
for (char& c : ext) {
if (std::isalpha(c)) c = std::toupper(c);
}
}
}
// 5. 构建完整路径列表
std::vector<std::string> fullFilenames;
for (const auto& ext : requiredExts) {
fullFilenames.push_back(baseName + ext);
}
// 6. 打印结果(实际使用中可能需要替换这里的打印逻辑)
std::cout << "Generated filenames: ";
for (const auto& name : fullFilenames) {
std::cout << name << " ";
}
std::cout << std::endl;
// ========== 新增:为每个文件生成下载请求 ==========
for (const auto& filename : fullFilenames) {
// 生成下载请求报文 (帧序号固定为1代表开始新文件的下载)
auto downloadMsg = generate_downloadfile_message(1, filename);
// 将下载动作添加到设备队列
ClientManager::instance().add_action_to_device(
id,
DeviceState::READING_EVENTFILE,
downloadMsg
);
std::cout << "Added download request for: " << filename << std::endl;
}
//最后报文收发处理逻辑(如果当前装置空闲则尝试执行后续动作)(如果当前装置存在其他状态则直接退出,不要干扰装置后续执行)
DeviceState currentState = DeviceState::IDLE;//获取当前装置的状态
if (!ClientManager::instance().get_device_state(id, currentState)) {
std::cerr << "Failed to get device state for: " << id << std::endl;
return;
}
switch (currentState) {
case DeviceState::IDLE:
//当前装置空闲中,可以执行后续动作
ClientManager::instance().post_message_processing(id);
break;
default:
//非空闲的其他状态直接退出即可,等待后续处理完毕后再尝试获取波形文件
break;
}
//处理完毕主动上送报文后直接退出,不要干扰装置正常应答
return;
}
//常规通讯报文
{
DeviceState currentState = DeviceState::IDLE;//获取当前装置的状态
if (!ClientManager::instance().get_device_state(id, currentState)) {
std::cerr << "Failed to get device state for: " << id << std::endl;
return;
}
// 根据装置状态处理报文
switch (currentState) {
case DeviceState::IDLE:
// 空闲状态下收到报文,可能是主动上报数据
std::cout << "IDLE state: Received active report from " << mac << std::endl;
// 这里可以添加处理主动上报数据的逻辑
break;
case DeviceState::READING_STATS:
// 读取统计数据状态
std::cout << "READING_STATS state: Processing stats data from " << mac << std::endl;
// 这里添加处理统计数据报文的逻辑
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_Stat)) {
// 一发多收,需要在这里等待所有报文收全再组装相应数据 一帧1K 直到所有数据传送完毕
//当前帧未收全,直接退出消息处理,等待后续帧
std::cout << "mac: " << mac << " count" << static_cast<int>(udata[10]) << std::endl;
// 解析帧信息 (根据实际协议调整)
int current_packet = static_cast<int>(udata[10]); // 当前帧序号
int total_packets = Stat_PacketNum; // 总帧数
std::vector<unsigned char> packet_data(udata, udata + length);
bool complete = ClientManager::instance().add_stat_packet_to_device(
id, packet_data, current_packet, total_packets
);
//判断是否收全
if (complete) {
// 1. 获取并清空缓存数据包
auto packets = ClientManager::instance().get_and_clear_stat_packets(id);
// 2. 按帧序号排序
std::sort(packets.begin(), packets.end(),
[](const ClientContext::StatPacket& a, const ClientContext::StatPacket& b) {
return a.packet_index < b.packet_index;
});
// 3. 解析每帧数据并提取数据体
std::vector<unsigned char> full_data;
MessageParser parser;
for (const auto& packet : packets) {
// 解析单帧报文
if (!parser.SetMsg(packet.data.data(), packet.data.size())) {
std::cerr << "Failed to parse packet " << packet.packet_index
<< " for device " << id << std::endl;
continue;
}
// 将数据体添加到完整序列
full_data.insert(full_data.end(),
parser.RecvData.begin(),
parser.RecvData.end());
}
// 4. 组装 tagPqData 对象
tagPqData pq_data;
if (!pq_data.SetStructBuf(full_data.data(), full_data.size())) {
std::cerr << "Failed to assemble tagPqData for device " << id << std::endl;
}
else {
// 成功组装,可以在这里使用 pq_data 对象
std::cout << "Successfully assembled tagPqData for device: "
<< id << std::endl;
float fPT = 1.0f;
float fCT = 1.0f;
if (ClientManager::instance().get_pt_ct_ratio(id, pq_data.name, fPT, fCT)) {
// 使用获取的变比值进行数据转换
tagPqData_Float float_data;
float_data.SetFloatValue(pq_data, fPT, fCT);
float_data.name = pq_data.name;
float_data.Data_Type = pq_data.Data_Type;
// 将浮点数据添加到缓存
// 添加到缓存并检查是否收全
bool complete = ClientManager::instance().add_float_data_to_device(
id, pq_data.name, pq_data.Data_Type, float_data);
if (complete) {
// 如果收全,立即取出处理
std::array<tagPqData_Float, 4> all_data =
ClientManager::instance().get_and_clear_float_data(id, pq_data.name);
if (!all_data.empty()) {
//单个测点 4组数据处理逻辑
tagPqData_Float max_data = all_data[0];
tagPqData_Float min_data = all_data[1];
tagPqData_Float avg_data = all_data[2];
tagPqData_Float cp95_data = all_data[3];
std::string strScale;//电压等级
int nPTType = 0;//接线方式
ClientManager::instance().get_point_scale_and_pttype(
id, // 或使用id
pq_data.name, // 从报文中解析出的测点序号
strScale,
nPTType);
// 转换为Base64字符串
std::string max_base64Str = max_data.ConvertToBase64(nPTType);
std::string min_base64Str = min_data.ConvertToBase64(nPTType);
std::string avg_base64Str = avg_data.ConvertToBase64(nPTType);
std::string cp95_base64Str = cp95_data.ConvertToBase64(nPTType);
//std::cout << "New star base64Str0:" << max_base64Str << std::endl;
//std::cout << "New del base64Str1:" << avg_data.ConvertToBase64(1) << std::endl;
//lnk20250708使用接口发送
time_t data_time = ConvertToTimestamp(avg_data.time);
std::vector<DataArrayItem> arr;
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
data_time, //数据转换出来的时间数据时标相对1970年的秒无效填入“-1”
-1, //数据时标,微秒钟,无效填入“-1”
0, //数据标识1-标识数据异常
max_base64Str});
arr.push_back({2, data_time, -1, 0, min_base64Str});
arr.push_back({3, data_time, -1, 0, avg_base64Str});
arr.push_back({4, data_time, -1, 0, cp95_base64Str});
std::string js = generate_json(
-1, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
123456, //设备唯一标识Ldid填入0代表Ndid,后续根据商议决定填id还是数字
3, //报文处理的优先级1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
0x1302, //设备数据主动上送的数据类型
avg_data.name, //逻辑子设备ID0-逻辑设备本身,无填-1
0x04, //数据类型固定为电能质量
2, //数据属性无“0”、实时“1”、统计“2”等
-1, //数据集序号(以数据集方式上送),无填-1
arr //数据数组
);
//std::cout << js << std::endl;
queue_data_t data;
data.monitor_no = 1; //暂无意义
data.strTopic = TOPIC_STAT;//统计topic
data.strText = js;
data.mp_id = "test"; //暂无意义
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
// 输出结果
//std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount()
// << " floats): " << base64Str << std::endl;
}
}
}
else {
// 处理获取变比值失败的情况
std::cerr << "Failed to get PT/CT ratio for device: "
<< mac << " lineno: " << pq_data.name << std::endl;
}
}
//数据组装完毕,修改为空闲状态等待下一项工作
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
else {
//未收全则直接结束处理,等待后续报文应答
return;
}
}
else {
// 装置答非所问异常
// 接收统计数据错误,调整为空闲状态,处理下一项工作。
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
break;
case DeviceState::READING_STATS_TIME:
// 读取统计时间状态
std::cout << "READING_STATS_TIME state: Processing stats time from " << mac << std::endl;
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_StatTime)) {
std::vector<PointInfo> points;//装置测点信息
if (ClientManager::instance().get_device_points(mac, points)) {
// 成功获取测点信息
// 处理接收装置的时标
tagTime t3;
t3.SetStructBuf(parser.RecvData.data(), parser.RecvData.size());
int first = 0;//第一次标记
for (const auto& point : points) {
for (ushort i = 0; i < 4; i++)//每个测点需要单独召唤最大最小平均95概率值
{
auto sendbuff = generate_statequerystat_message(t3, point.nCpuNo, i);//组装询问统计数据报文
if (first == 0) {
//首次尝试组装报文 直接将当前状态调整 并等待最后启动发送
first++;
ClientManager::instance().change_device_state(id, DeviceState::READING_STATS, sendbuff);
}
else {
//非首次进入,将动作传入队列等待
ClientManager::instance().add_action_to_device(id, DeviceState::READING_STATS, sendbuff);
}
}
}
}
else {
// 未找到装置下属测点异常
// 接收统计数据时间错误,调整为空闲状态,处理下一项工作。
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
}
else {
// 装置答非所问异常
// 接收统计数据时间错误,调整为空闲状态,处理下一项工作。
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
break;
case DeviceState::READING_REALSTAT:
//读取实时数据状态
std::cout << "READING_REALSTAT state: Processing stats data from " << mac << std::endl;
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_New_3S)) {
unsigned char packet_type = udata[13];
//取监测点号
unsigned char cid = udata[12];
// 将数据添加到缓存
const uint8_t* data_ptr = parser.RecvData.data() + 4;
size_t data_size = parser.RecvData.size() - 4;
ClientManager::instance().add_realtime_packet_to_device(
id, packet_type, data_ptr, data_size
);
// 如果不是最后一个包,请求下一个包
if (packet_type != 0x06) {
unsigned char next_packet_type = packet_type + 1;
auto sendbuff = generate_realstat_message(
static_cast<unsigned char>(udata[12]),
next_packet_type,
static_cast<unsigned char>(0x01)
);
ClientManager::instance().change_device_state(
id, DeviceState::READING_REALSTAT, sendbuff
);
}
else {
// 获取并清空缓存
auto packets = ClientManager::instance().get_and_clear_realtime_packets(id);
// 按包类型排序01-06
std::sort(packets.begin(), packets.end(),
[](const ClientContext::RealtimePacket& a,
const ClientContext::RealtimePacket& b) {
return a.packet_type < b.packet_type;
});
RealtagPqDate_float realdata;
// 按顺序解析每个包
for (const auto& packet : packets) {
switch (packet.packet_type) {
case 0x01:
realdata.ParsePacket1(packet.data.data(), packet.data.size());
break;
case 0x02:
realdata.ParsePacket2(packet.data.data(), packet.data.size());
break;
case 0x03:
realdata.ParsePacket3(packet.data.data(), packet.data.size());
break;
case 0x04:
realdata.ParsePacket4(packet.data.data(), packet.data.size());
break;
case 0x05:
realdata.ParsePacket5(packet.data.data(), packet.data.size());
break;
case 0x06:
realdata.ParsePacket6(packet.data.data(), packet.data.size());
break;
}
}
std::string base64 = realdata.ConvertToBase64();
std::cout << base64 << std::endl;
//lnk实时数据使用接口发送20250711
time_t data_time = ConvertToTimestamp(realdata.time);
std::vector<DataArrayItem> arr;
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
data_time, //数据转换出来的时间数据时标相对1970年的秒无效填入“-1”
-1, //数据时标,微秒钟,无效填入“-1”
0, //数据标识1-标识数据异常
base64});
std::string js = generate_json(
-1, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
123456, //设备唯一标识Ldid填入0代表Ndid,后续根据商议决定填id还是数字
3, //报文处理的优先级1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
0x1302, //设备数据主动上送的数据类型
cid, //逻辑子设备ID0-逻辑设备本身,无填-1
0x04, //数据类型固定为电能质量数据
1, //数据属性无“0”、实时“1”、统计“2”等
-1, //数据集序号(以数据集方式上送),无填-1
arr //数据数组
);
//std::cout << js << std::en
queue_data_t data;
data.monitor_no = 1; //暂无意义
data.strTopic = TOPIC_RTDATA; //实时topic
data.strText = js;
data.mp_id = "test"; //暂无意义
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
// 处理完成后重置状态
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
}
else {
// 装置答非所问异常
// 接收实时数据错误,调整为空闲状态,处理下一项工作。
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
break;
case DeviceState::READING_EVENTFILE:
// 暂态波形文件下载
std::cout << "READING_EVENTFILE state: Processing stats time from " << mac << std::endl;
if (udata[8] == static_cast<unsigned char>(MsgResponseType::Response_File_Download)) {
// 提取当前帧序号12-15字节大端序
int current_frame = (static_cast<int>(udata[12]) << 24) |
(static_cast<int>(udata[13]) << 16) |
(static_cast<int>(udata[14]) << 8) |
static_cast<int>(udata[15]);
// 提取总帧数16-19字节大端序
int total_frames = (static_cast<int>(udata[16]) << 24) |
(static_cast<int>(udata[17]) << 16) |
(static_cast<int>(udata[18]) << 8) |
static_cast<int>(udata[19]);
//std::cout << "eventfile frames: " << current_frame << "/" << total_frames << std::endl;
// 将数据添加到缓存 去除数据体前14位 (逻辑稍后编写)
const uint8_t* data_ptr = parser.RecvData.data() + 14;
size_t data_size = parser.RecvData.size() - 14;
// 如果是第一帧,记录文件名
if (current_frame == 1) {
ClientManager::DownloadInfo info;
if (ClientManager::instance().parse_download_packet(id, info)) {
ClientManager::instance().update_current_filename(id, info.filename);
}
}
// 获取文件名
std::string filename = ClientManager::instance().get_current_filename(id);
// 添加到缓存
ClientManager::instance().add_file_packet_to_device(id, current_frame, data_ptr, data_size);
//std::cout << "fileinfo: " << info.filename << "/" << info.current_frame << std::endl;
//判断是否收全,未收全则继续发送报文,收全则取出所有缓存组装文件并保存至本地,推送消息
if (current_frame < total_frames) {
// 未收全,更新帧序号并保持状态,等待后续自动发送已修改的新报文
int nextframe = current_frame + 1;
auto downloadMsg = generate_downloadfile_message(nextframe, filename);
ClientManager::instance().change_device_state(id, DeviceState::READING_EVENTFILE, downloadMsg);
}
else {
// 已收全,在此处处理文件
std::cout << "mac: " << mac << " fileinfo: " << filename <<std::endl;
// 获取缓存中的所有分片
auto packets = ClientManager::instance().get_and_clear_file_packets(id);
// 合并文件内容
std::vector<unsigned char> file_data;
for (const auto& packet : packets) {
file_data.insert(file_data.end(), packet.begin(), packet.end());
}
// 保存文件
std::string mac_dir = mac; // 使用MAC地址作为目录名
// 创建目录(如果不存在)
if (mkdir(mac_dir.c_str(), 0777) != 0 && errno != EEXIST) {
std::cerr << "Failed to create directory: " << mac_dir << std::endl;
}
std::string path = extract_filename(filename);
std::string file_path = mac_dir + "/" + path;
std::ofstream out_file(file_path, std::ios::binary);
if (out_file) {
out_file.write(reinterpret_cast<const char*>(file_data.data()),
file_data.size());
std::cout << "File saved: " << file_path << std::endl;
}
else {
std::cerr << "Failed to save file: " << file_path
<< ", Error: " << strerror(errno) << std::endl;
}
//当前文件下载完毕,调整为空闲处理下一项工作(如果这里后续有新文件等待下载,一般已经存入等待队列等候处理了,调成空闲状态后直接就会开始新文件的下载工作)
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
}
else {
// 装置答非所问异常
// 接收波形文件数据错误,调整为空闲状态,处理下一项工作。
std::cout << "udata[8]: " << static_cast<int>(udata[8]) << std::endl;
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
}
break;
case DeviceState::CUSTOM_ACTION:
// 自定义动作状态
std::cout << "CUSTOM_ACTION state: Processing custom response from " << mac << std::endl;
// 这里添加处理自定义动作响应的逻辑
// 处理完成后标记状态完成
ClientManager::instance().change_device_state(id, DeviceState::IDLE);
break;
default:
std::cerr << "Unknown state: " << static_cast<int>(currentState)
<< " for device " << id << std::endl;
break;
}
// 无论何种状态,处理完成后触发后续状态处理
ClientManager::instance().post_message_processing(id);
}
}
}