diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 2f05ca1..4ac2cf4 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -68,6 +68,9 @@ extern XmlConfig xmlcfg2;//角型接线xml节点解析的数据-默认映射文 extern std::list topicList2; //角型接线发送主题链表 extern std::map xmlinfo_list2;//保存所有型号角形接线对应的icd映射文件解析数据 +////////////////////////////////////////////////////////////////////////////////////////////////// +extern time_t ConvertToTimestamp(const tagTime& time); + //////////////////////////////////////////////////////////////////////////////////////////////////// static std::mutex g_filemenu_cache_mtx; std::map> g_filemenu_cache; @@ -5110,8 +5113,124 @@ bool append_qvvr_event(const std::string& terminal_id, return true; } +/////////////////////////////////////////////////////////////////////////////////////////////////////////////实时数据封装发送 +void enqueue_realtime_pq(const RealtagPqDate_float& realdata, + int nPTType, + unsigned char cid, + const std::string& mac, + const std::string& devid) +{ + // 先根据 devIdxMap 的配置决定编码分支: + // 2: 基础数据 3: 谐波电压含有率 4: 谐波电流有效值 5: 间谐波电压含有率 + int idx = 0; + std::string base64; + // 这里尝试用 mac 作为 key 获取 idx;如果项目里 devIdxMap 的 key 不是 mac, + // 你可以把这里改成对应的设备 id(devid)。未命中则再尝试用规范化后的 mac。 + if (devidx_get(devid, idx)) { + switch (idx) { + case 2: // 基础数据(根据接线方式选择转换方法 数据全集解析) + base64 = realdata.ConvertToBase64(nPTType); + break; + case 3: // 谐波电压含有率 + base64 = realdata.ConvertToBase64_RtHarmV(nPTType); + break; + case 4: // 谐波电流有效值(幅值) + base64 = realdata.ConvertToBase64_RtHarmI(); + break; + case 5: // 间谐波电压含有率 + base64 = realdata.ConvertToBase64_RtInHarmV(); + break; + default: + // 未知 idx,回退到基础数据 + base64 = realdata.ConvertToBase64(nPTType); + break; + } + } else { + // 未配置 idx,回退到基础数据 + base64 = realdata.ConvertToBase64(nPTType); + } + //std::cout << base64 << std::endl; + //lnk实时数据使用接口发送20250711 + time_t data_time = ConvertToTimestamp(realdata.time); + std::vector arr; + arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” + data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1” + 0, //数据时标,微秒钟,无效填入“-1” + 0, //数据标识,1-标识数据异常 + base64}); + std::string js = generate_json( + normalize_mac(mac), + -1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” + 1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字 + 1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 + 0x1302, //设备数据主动上送的数据类型 + cid, //逻辑子设备ID,0-逻辑设备本身,无填-1 + 0x04, //数据类型固定为电能质量数据 + 1, //数据属性:无“0”、实时“1”、统计“2”等 + idx, //数据集序号(以数据集方式上送),无填-1 + arr //数据数组 + ); + //std::cout << js << std::en + queue_data_t data; + data.monitor_no = 1; //上送的实时数据没有测点序号,统一填1 + data.strTopic = TOPIC_RTDATA; //实时topic + data.strText = js; + data.mp_id = ""; //监测点id,暂时不需要 + data.tag = G_RT_TAG; //实时tag + data.key = G_RT_KEY; //实时key + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); +} +////////////////////////////////////////////////////////////////////////////////统计数据打包发送 +// 封装:组装统计数据并入队发送 +void enqueue_stat_pq(const std::string& max_base64Str, + const std::string& min_base64Str, + const std::string& avg_base64Str, + const std::string& cp95_base64Str, + time_t data_time, + const std::string& mac, + short cid) +{ + std::vector arr; + arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” + data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1” + 0, //数据时标,微秒钟,无效填入“-1” + 0, //数据标识,1-标识数据异常 + max_base64Str}); + arr.push_back({2, data_time, 0, 0, min_base64Str}); + arr.push_back({3, data_time, 0, 0, avg_base64Str}); + arr.push_back({4, data_time, 0, 0, cp95_base64Str}); + + std::string js = generate_json( + normalize_mac(mac), + -1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” + 1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字 + 1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 + 0x1302, //设备数据主动上送的数据类型 + cid, //逻辑子设备ID,0-逻辑设备本身,无填-1(原:avg_data.name) + 0x04, //数据类型固定为电能质量 + 2, //数据属性:无“0”、实时“1”、统计“2”等 + 1, //数据集序号(以数据集方式上送),无填-1 + arr //数据数组 + ); + + //std::cout << js << std::endl; + + queue_data_t data; + data.monitor_no = cid; //监测点序号(原:avg_data.name) + data.strTopic = TOPIC_STAT;//统计topic + data.strText = js; + data.mp_id = ""; //监测点id,暂时不需要 + data.tag = G_ROCKETMQ_TAG; //统计tag + data.key = G_ROCKETMQ_KEY; //统计key + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); + + std::cout << "Successfully assembled tagPqData for line: " + << cid << std::endl; +} \ No newline at end of file diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index dd5aa4b..d0ee754 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -730,6 +730,22 @@ inline bool devidx_get(const std::string& id, int& out_idx) { out_idx = it->second; return true; } + +void enqueue_realtime_pq(const RealtagPqDate_float& realdata, + int nPTType, + unsigned char cid, + const std::string& mac, + const std::string& devid); + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////统计数据 +void enqueue_stat_pq(const std::string& max_base64Str, + const std::string& min_base64Str, + const std::string& avg_base64Str, + const std::string& cp95_base64Str, + time_t data_time, + const std::string& mac, + short cid); + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// extern int g_front_seg_index; extern std::string FRONT_INST; diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index e4b28e6..3a0833a 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -346,7 +346,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer) /////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理 -bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& Idx){ +bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& idx){ json root; try { root = json::parse(body); @@ -382,7 +382,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l !messageBody.contains("realData") || !messageBody.contains("soeData") || !messageBody.contains("limit")|| - !messageBody.contains("Idx")) + !messageBody.contains("idx")) { std::cerr << "Missing expected fields in 'messageBody'." << std::endl; return false; @@ -394,7 +394,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l realData = messageBody["realData"].get(); soeData = messageBody["soeData"].get(); limit = messageBody["limit"].get(); - int idx = messageBody["Idx"].get(); + idx = messageBody["idx"].get(); } catch (const std::exception& e) { std::cerr << "Type error while extracting fields: " << e.what() << std::endl; return false; diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 2a23db8..19bf16d 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -498,58 +498,13 @@ void process_received_message(string mac, string id,const char* data, size_t len //lnk20250708使用接口发送 time_t data_time = ConvertToTimestamp(avg_data.time); - std::vector arr; - arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” - data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1” - 0, //数据时标,微秒钟,无效填入“-1” - 0, //数据标识,1-标识数据异常 - max_base64Str}); - arr.push_back({2, data_time, 0, 0, min_base64Str}); - arr.push_back({3, data_time, 0, 0, avg_base64Str}); - arr.push_back({4, data_time, 0, 0, cp95_base64Str}); - - std::string js = generate_json( - normalize_mac(mac), - -1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” - 1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字 - 1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 - 0x1302, //设备数据主动上送的数据类型 - avg_data.name, //逻辑子设备ID,0-逻辑设备本身,无填-1 - 0x04, //数据类型固定为电能质量 - 2, //数据属性:无“0”、实时“1”、统计“2”等 - 1, //数据集序号(以数据集方式上送),无填-1 - arr //数据数组 - ); - - //std::cout << js << std::endl; - - //// 创建输出流并打开文件(覆盖模式) - //std::ofstream outFile("json.txt"); // 等价于 std::ofstream outFile(filePath, std::ios::out); - - //if (outFile.is_open()) { // 检查文件是否成功打开 - // outFile << js; // 写入字符串 - // outFile.close(); // 关闭文件 - // // 成功提示(实际应用中建议使用日志) - //} - //else { - // // 错误处理:文件打开失败(如路径不存在) - //} - - queue_data_t data; - data.monitor_no = avg_data.name; //监测点序号 - data.strTopic = TOPIC_STAT;//统计topic - data.strText = js; - data.mp_id = ""; //监测点id,暂时不需要 - data.tag = G_ROCKETMQ_TAG; //统计tag - data.key = G_ROCKETMQ_KEY; //统计key - std::lock_guard lock(queue_data_list_mutex); - queue_data_list.push_back(data); - - std::cout << "Successfully assembled tagPqData for line: " - << avg_data.name << std::endl; - // 输出结果 - //std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount() - // << " floats): " << base64Str << std::endl; + enqueue_stat_pq(max_base64Str, + min_base64Str, + avg_base64Str, + cp95_base64Str, + data_time, + mac, + avg_data.name); } } @@ -685,41 +640,8 @@ void process_received_message(string mac, string id,const char* data, size_t len strScale, nPTType); - std::string base64 = realdata.ConvertToBase64(nPTType); - //std::cout << base64 << std::endl; - - //lnk实时数据使用接口发送20250711 - time_t data_time = ConvertToTimestamp(realdata.time); - - std::vector arr; - arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” - data_time, //数据转换出来的时间,数据时标,相对1970年的秒,无效填入“-1” - 0, //数据时标,微秒钟,无效填入“-1” - 0, //数据标识,1-标识数据异常 - base64}); - std::string js = generate_json( - normalize_mac(mac), - -1, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” - 1, //设备唯一标识Ldid,填入0代表Ndid,后续根据商议决定填id还是数字 - 1, //报文处理的优先级:1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文 - 0x1302, //设备数据主动上送的数据类型 - cid, //逻辑子设备ID,0-逻辑设备本身,无填-1 - 0x04, //数据类型固定为电能质量数据 - 1, //数据属性:无“0”、实时“1”、统计“2”等 - 2, //数据集序号(以数据集方式上送),无填-1 - arr //数据数组 - ); - //std::cout << js << std::en - queue_data_t data; - data.monitor_no = 1; //上送的实时数据没有测点序号,统一填1 - data.strTopic = TOPIC_RTDATA; //实时topic - data.strText = js; - data.mp_id = ""; //监测点id,暂时不需要 - data.tag = G_RT_TAG; //实时tag - data.key = G_RT_KEY; //实时key - std::lock_guard lock(queue_data_list_mutex); - queue_data_list.push_back(data); - + // 转换为Base64字符串并发送lnk20250924 + enqueue_realtime_pq(realdata, nPTType, cid, mac, id); // 处理完成后重置状态 ClientManager::instance().change_device_state(id, DeviceState::IDLE);