From 3fb8ac84f52b80758e7b617fe15f483051c11387 Mon Sep 17 00:00:00 2001 From: lnk Date: Fri, 11 Jul 2025 14:12:47 +0800 Subject: [PATCH 1/2] rtdata use mq interface --- LFtid1056/dealMsg.cpp | 63 ++++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 3fdb929..ead4191 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -19,14 +19,14 @@ using namespace std; SafeMessageQueue message_queue; // 全局消息队列 //时间转换函数 -time_t ConvertToTimestamp(const tagPqData_Float& data) { +time_t ConvertToTimestamp(const tagTime& time) { struct tm t = {}; - t.tm_year = data.time.DeviceYear - 1900; // tm_year 从 1900 开始计 - t.tm_mon = data.time.DeviceMonth - 1; // tm_mon 从 0(1月)开始 - t.tm_mday = data.time.DeviceDay; - t.tm_hour = data.time.DeviceHour; - t.tm_min = data.time.DeviceMinute; - t.tm_sec = data.time.DeviceSecond; + t.tm_year = time.DeviceYear - 1900; // tm_year 从 1900 开始计 + t.tm_mon = time.DeviceMonth - 1; // tm_mon 从 0(1月)开始 + t.tm_mday = time.DeviceDay; + t.tm_hour = time.DeviceHour; + t.tm_min = time.DeviceMinute; + t.tm_sec = time.DeviceSecond; // 返回时间戳(本地时间) return mktime(&t); @@ -177,7 +177,7 @@ void process_received_message(string mac, string id,const char* data, size_t len std::string cp95_base64Str = cp95_data.ConvertToBase64(); //lnk20250708使用接口发送 - time_t data_time = ConvertToTimestamp(avg_data); + time_t data_time = ConvertToTimestamp(avg_data.time); std::vector arr; arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” @@ -191,22 +191,22 @@ void process_received_message(string mac, string id,const char* data, size_t len std::string js = generate_json( -1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” - 123456, //设备唯一标识Ldid,填入0代表Ndid - 1, //报文处理的优先级 + 123456, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字 + 3, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 0x1302, //设备数据主动上送的数据类型 - max_data.name, //逻辑子设备ID,0-逻辑设备本身,无填-1 - max_data.Data_Type, //数据类型 + avg_data.name, //逻辑子设备ID,0-逻辑设备本身,无填-1 + avg_data.Data_Type, //数据类型 2, //数据属性:无“0”、实时“1”、统计“2”等 - 1, //数据集序号(以数据集方式上送),无填-1 + -1, //数据集序号(以数据集方式上送),无填-1 arr //数据数组 ); //std::cout << js << std::endl; queue_data_t data; - data.monitor_no = 1; - data.strTopic = TOPIC_STAT; + data.monitor_no = 1; //暂无意义 + data.strTopic = TOPIC_STAT;//统计topic data.strText = js; - data.mp_id = "test"; + data.mp_id = "test"; //暂无意义 std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); @@ -341,6 +341,37 @@ void process_received_message(string mac, string id,const char* data, size_t len std::string base64 = realdata.ConvertToBase64(); std::cout << base64 << std::endl; + + //lnk实时数据使用接口发送20250711 + time_t data_time = ConvertToTimestamp(realdata.time); + + std::vector arr; + arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” + data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1” + -1, //数据时标,微秒钟,无效填入“-1” + 0, //数据标识,1-标识数据异常 + base64}); + std::string js = generate_json( + -1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” + 123456, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字 + 3, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 + 0x1302, //设备数据主动上送的数据类型 + 1,//realdata.name, //逻辑子设备ID,0-逻辑设备本身,无填-1;后续确认实时数据从何获取 + 1,//realdata.Data_Type, //数据类型;后续确认实时数据从何获取 + 1, //数据属性:无“0”、实时“1”、统计“2”等 + -1, //数据集序号(以数据集方式上送),无填-1 + arr //数据数组 + ); + //std::cout << js << std::en + queue_data_t data; + data.monitor_no = 1; //暂无意义 + data.strTopic = TOPIC_RTDATA; //实时topic + data.strText = js; + data.mp_id = "test"; //暂无意义 + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); + + // 处理完成后重置状态 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } From 8e4e45ce31ef056f1b9c0fff903fac6aeee6db38 Mon Sep 17 00:00:00 2001 From: lnk Date: Mon, 14 Jul 2025 13:27:38 +0800 Subject: [PATCH 2/2] use set_real_state_count --- LFtid1056/cloudfront/code/interface.h | 4 ++-- LFtid1056/cloudfront/code/main.cpp | 2 +- LFtid1056/cloudfront/code/rocketmq.cpp | 10 +++++++--- LFtid1056/dealMsg.cpp | 10 +++++++--- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 3f897e0..b94c1f2 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -380,8 +380,8 @@ typedef struct { // 鍗曟潯 DataArray 鏁版嵁 struct DataArrayItem { int DataAttr; - int DataTimeSec; - int DataTimeUSec; + time_t DataTimeSec; + time_t DataTimeUSec; int DataTag; std::string Data; }; diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index 718e9f2..074555a 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -317,7 +317,7 @@ void Front::FrontThread() { try { while (!m_bIsFrontThreadCancle) { - check_3s_config(); // 瀹炴椂鏁版嵁瑙﹀彂 + //check_3s_config(); // 瀹炴椂鏁版嵁瑙﹀彂 //create_recall_xml(); // 鐢熸垚寰呰ˉ鎷泋ml鏂囦欢 check_ledger_update(); // 瑙﹀彂鍙拌处鏇存柊 } diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index a23440b..1147ae9 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -37,6 +37,8 @@ #include "interface.h" #include "front.h" +#include "../../client2.h" + ////////////////////////////////////////////////////////////////////////////////////////////////////////// using namespace std; @@ -876,11 +878,13 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms } // 鍐欏叆 XML - if (!createXmlFile(dev_index, mp_index, realData, soeData, limit, "new")) { + /*if (!createXmlFile(dev_index, mp_index, realData, soeData, limit, "new")) { DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃棤娉曞垱寤哄疄鏃舵暟鎹Е鍙戞枃浠"); std::cerr << "Failed to create the XML file." << std::endl; return rocketmq::RECONSUME_LATER; - } + }*/ + //涓嶅啀浣跨敤鏂囦欢瑙﹀彂鏂瑰紡锛岀洿鎺ヨ皟鐢ㄦ帴鍙e悜缁堢鍙戣捣璇锋眰 + ClientManager::instance().set_real_state_count(devid, 60,mp_index);//涓绉掕闂竴娆★紝璇㈤棶60娆 return rocketmq::CONSUME_SUCCESS; } @@ -1009,7 +1013,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms if (!result.empty()) { std::lock_guard lock(ledgermtx); - recall_json_handle(result); + recall_json_handle(result);//涓嶅啀浣跨敤鏂囦欢琛ユ嫑鏂瑰紡 } else { std::cerr << "recall data is NULL." << std::endl; diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index ead4191..6866944 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -195,7 +195,7 @@ void process_received_message(string mac, string id,const char* data, size_t len 3, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 0x1302, //设备数据主动上送的数据类型 avg_data.name, //逻辑子设备ID,0-逻辑设备本身,无填-1 - avg_data.Data_Type, //数据类型 + 0x04, //数据类型固定为电能质量 2, //数据属性:无“0”、实时“1”、统计“2”等 -1, //数据集序号(以数据集方式上送),无填-1 arr //数据数组 @@ -284,6 +284,10 @@ void process_received_message(string mac, string id,const char* data, size_t len std::cout << "READING_REALSTAT state: Processing stats data from " << mac << std::endl; if (udata[8] == static_cast(MsgResponseType::Response_New_3S)) { unsigned char packet_type = udata[13]; + + //取监测点号 + unsigned char cid = udata[12]; + // 将数据添加到缓存 const uint8_t* data_ptr = parser.RecvData.data() + 4; size_t data_size = parser.RecvData.size() - 4; @@ -356,8 +360,8 @@ void process_received_message(string mac, string id,const char* data, size_t len 123456, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字 3, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 0x1302, //设备数据主动上送的数据类型 - 1,//realdata.name, //逻辑子设备ID,0-逻辑设备本身,无填-1;后续确认实时数据从何获取 - 1,//realdata.Data_Type, //数据类型;后续确认实时数据从何获取 + cid, //逻辑子设备ID,0-逻辑设备本身,无填-1 + 0x04, //数据类型固定为电能质量数据 1, //数据属性:无“0”、实时“1”、统计“2”等 -1, //数据集序号(以数据集方式上送),无填-1 arr //数据数组