From 747d6c757c5c7ccff7eea09ebd920ed811d9fb26 Mon Sep 17 00:00:00 2001 From: lnk Date: Thu, 26 Jun 2025 16:44:21 +0800 Subject: [PATCH] generate ledger to main thread --- LFtid1056/cloudfront/code/cfg_parser.cpp | 59 ++++++++++++---- LFtid1056/cloudfront/code/interface.h | 9 +++ LFtid1056/cloudfront/code/log4.cpp | 6 -- LFtid1056/cloudfront/code/main.cpp | 4 -- LFtid1056/cloudfront/code/rocketmq.cpp | 87 +++++++++--------------- LFtid1056/cloudfront/code/worker.cpp | 7 +- LFtid1056/main_thread.cpp | 2 + 7 files changed, 93 insertions(+), 81 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 697e0f6..5a9365b 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -31,6 +31,8 @@ #include "tinyxml2.h" #include "rocketmq.h" + + ///////////////////////////////////////////////////////////////////////////////////////////////// using namespace std; @@ -58,8 +60,6 @@ extern std::list queue_data_list; //queue发送数据链表 extern int three_secs_enabled; -extern std::vector terminal_devlist; - extern std::map xmlinfo_list;//保存所有型号对应的icd映射文件解析数据 extern XmlConfig xmlcfg;//星形接线xml节点解析的数据-默认映射文件解析数据 extern std::list topicList; //队列发送主题链表 @@ -2686,15 +2686,15 @@ void to_json(nlohmann::json& j, const FullObj& f) { }; } std::string generate_json( - int Mid, - int Did, - int Pri, - int Type, - int Cldid, - int DataType, - int DataAttr, - int DsNameIdx, - const std::vector& dataArray //构造出array后调用这个函数 + int Mid, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” + int Did, //设备唯一标识Ldid,填入0代表Ndid。 + int Pri, //报文处理的优先级 + int Type, //消息类型 + int Cldid, //逻辑子设备ID,0-逻辑设备本身,无填-1 + int DataType, //数据类型,0-表示以数据集方式上送 + int DataAttr, //数据属性:无“0”、实时“1”、统计“2”等。 + int DsNameIdx, //数据集序号(以数据集方式上送),无填-1 + const std::vector& dataArray //数据数组。 ) { FullObj fobj; fobj.Mid = Mid; @@ -2712,7 +2712,11 @@ std::string generate_json( void upload_data_test(){ std::vector arr; - arr.push_back({1, 1725477660, 0, 1, "xxxx"}); + arr.push_back({1, 1725477660, 0, 1, "xxxx"}); //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” + //数据时标,相对1970年的秒,无效填入“-1” + //数据时标,微秒钟,无效填入“-1” + //数据标识,1-标识数据异常 + //数据序列(数据集上送时将二进制数据流转换成Base64字符串,其他数据为object) arr.push_back({2, 1691741340, 0, 1, "yyyy"}); std::string js = generate_json( @@ -2728,3 +2732,34 @@ void upload_data_test(){ std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } + +//////////////////////////////////////////////////////////////////////////////////////////////////台账赋值给通信 + +std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist) { + std::vector devices; + + for (const auto& terminal : terminal_devlist) { + DeviceInfo device; + device.device_id = terminal.terminal_id; + device.name = terminal.terminal_name; + device.model = terminal.dev_series; + device.mac = terminal.mac; + + for (const auto& monitor : terminal.line) { + PointInfo point; + point.point_id = monitor.monitor_id; + point.name = monitor.monitor_name; + point.device_id = terminal.terminal_id; + point.PT1 = monitor.PT1; + point.PT2 = monitor.PT2; + point.CT1 = monitor.CT1; + point.CT2 = monitor.CT2; + + device.points.push_back(point); + } + + devices.push_back(device); + } + + return devices; +} \ No newline at end of file diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index b19e18b..5e564f5 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -12,6 +12,8 @@ #include "nlohmann/json.hpp" +#include "../../client2.h" + /////////////////////////////////////////////////////////////////////////////////////////// class Front; @@ -308,6 +310,9 @@ int parse_model_cfg_web(); void qvvr_test(); void Fileupload_test(); +extern std::vector terminal_devlist; +extern std::mutex ledgermtx; + //////////////////////////////////////////////////////////////////////////////////cfg_parse的函数声明 void init_config(); @@ -403,6 +408,10 @@ void to_json(nlohmann::json& j, const FullObj& f); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist); + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #endif diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index bffb006..cbfc1a0 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -47,15 +47,9 @@ extern int g_front_seg_index; extern std::string FRONT_INST; extern std::string subdir; -//mq -extern std::mutex queue_data_list_mutex; //queue发送数据锁 -extern std::list queue_data_list; //queue发送数据链表 - //日志主题 extern std::string G_LOG_TOPIC; -extern std::vector terminal_devlist; - ////////////////////////////////////////////////////////辅助函数 std::string get_front_type_from_subdir() { if (subdir == "cfg_3s_data") diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index f543257..c71546b 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -78,10 +78,6 @@ extern int TEST_PORT; //测试端口号 extern std::string FRONT_INST; -extern std::mutex queue_data_list_mutex; -extern std::list queue_data_list; - - /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数 template diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index b6f238d..b1c23f4 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -58,10 +58,6 @@ static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者 /////////////////////////////////////////////////////////////////////////////////////////////////////////// -//台账 -extern std::mutex ledgermtx; -extern std::vector terminal_devlist; - //前置进程 extern unsigned int g_node_id; extern int g_front_seg_index; @@ -1391,9 +1387,8 @@ bool shouldSkipTerminal(const std::string& terminal_id) { return false; } -void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { - - if(!INITFLAG){ +void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { + if (!INITFLAG) { std::cout << "前置未初始化完成\n"; return; } @@ -1451,41 +1446,34 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { data.mp_id = dev.line[j].monitor_id; data.monitor_no = static_cast(i + j); - std::string modified_time = std::to_string(current_time_ms); + std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; - - // 替换 Monitor - size_t monitor_pos = modified_strText.find("\"Monitor\""); - if (monitor_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", monitor_pos); - size_t quote_pos = modified_strText.find("\"", colon_pos); - size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id); - } - } - - // 替换 TIME - size_t time_pos = modified_strText.find("\"TIME\""); - if (time_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", time_pos); - size_t quote_pos = colon_pos; - size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); + try { + auto j = nlohmann::json::parse(modified_strText); + j["Did"] = i; + if (j.contains("Msg") && j["Msg"].is_object()) { + j["Msg"]["Cldid"] = j; + if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { + for (auto& item : j["Msg"]["DataArray"]) { + if (item.is_object()) { + item["DataTimeSec"] = std::stoll(modified_time); + } + } + } } + modified_strText = j.dump(); + } catch (...) { + // 保持原始文本 } data.strText = modified_strText; - //my_rocketmq_send(data,front->m_producer); std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; - } } } else { @@ -1493,43 +1481,36 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) { std::string monitor_id = "testmonitor" + std::to_string(i); - data.mp_id = monitor_id; data.monitor_no = i; - std::string modified_time = std::to_string(current_time_ms); + std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; - // 替换 Monitor - size_t monitor_pos = modified_strText.find("\"Monitor\""); - if (monitor_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", monitor_pos); - size_t quote_pos = modified_strText.find("\"", colon_pos); - size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id); - } - } - - // 替换 TIME - size_t time_pos = modified_strText.find("\"TIME\""); - if (time_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", time_pos); - size_t quote_pos = colon_pos; - size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); + try { + auto j = nlohmann::json::parse(modified_strText); + j["Did"] = 0; + if (j.contains("Msg") && j["Msg"].is_object()) { + j["Msg"]["Cldid"] = data.mp_id; + if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { + for (auto& item : j["Msg"]["DataArray"]) { + if (item.is_object()) { + item["DataTimeSec"] = std::stoll(modified_time); + } + } + } } + modified_strText = j.dump(); + } catch (...) { + // 保持原始文本 } data.strText = modified_strText; - //my_rocketmq_send(data,front->m_producer); std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; - } } diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 5516939..709147b 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -48,11 +48,6 @@ bool showinshellflag =false; extern std::list errorList, warnList, normalList; extern std::mutex errorListMutex, warnListMutex, normalListMutex; -extern std::vector terminal_devlist; -extern std::mutex ledgermtx; - -extern std::list queue_data_list; - extern int IED_COUNT; extern int INITFLAG; extern int g_front_seg_index; @@ -281,7 +276,7 @@ extern bool normalOutputEnabled; if (G_TEST_NUM != 0) { std::cout << "[PeriodicTask] Executing rocketmq_test_300()\n"; rocketmq_test_300(G_TEST_NUM, g_front_seg_index, G_TEST_TYPE,m_front); - upload_data_test(); + //upload_data_test(); } } diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 1a4a966..3171a60 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -152,6 +152,8 @@ void* client_manager_thread(void* arg) { // 100װ std::vector test_devices = generate_test_devices(100); + //std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk + // ͻ start_client_connect(devices);