From 3dc54a9b754c5d1d71c4e620e5f94d6a9e648e8c Mon Sep 17 00:00:00 2001 From: lnk Date: Wed, 25 Jun 2025 13:35:17 +0800 Subject: [PATCH 1/4] fix segfault --- LFtid1056/main_thread.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 8112c3c..1a4a966 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -292,6 +292,16 @@ int main() { free(index); } } + else if (i == 2){ + //接口和mq + char* argv[] = { (char*)index,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; + ThreadArgs* args = new ThreadArgs{3, argv}; + if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { + printf("Failed to create message processor thread %d\n", i); + delete args; // 如果线程没创建成功就手动释放 + free(index); + } + } else { // 其他工作线程 // 这里简化为空,实际应用中可添加其他线程 From 4e6ba12b259d5ee15bbd1bc7109de1e722568bfe Mon Sep 17 00:00:00 2001 From: lnk Date: Thu, 26 Jun 2025 14:39:34 +0800 Subject: [PATCH 2/4] modify ledger --- .vscode/settings.json | 75 ++++++++++++- LFtid1056/cloudfront/code/cfg_parser.cpp | 135 +++++++++++++++++++++-- LFtid1056/cloudfront/code/interface.cpp | 13 ++- LFtid1056/cloudfront/code/interface.h | 58 +++++++++- LFtid1056/cloudfront/code/main.cpp | 6 +- LFtid1056/cloudfront/code/rocketmq.cpp | 23 +++- LFtid1056/cloudfront/code/worker.cpp | 13 ++- 7 files changed, 295 insertions(+), 28 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index bb879da..d74d88c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -55,5 +55,78 @@ "C_Cpp_Runner.useLeakSanitizer": false, "C_Cpp_Runner.showCompilationTime": false, "C_Cpp_Runner.useLinkTimeOptimization": false, - "C_Cpp_Runner.msvcSecureNoWarnings": false + "C_Cpp_Runner.msvcSecureNoWarnings": false, + "files.associations": { + "any": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "cctype": "cpp", + "charconv": "cpp", + "chrono": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "codecvt": "cpp", + "compare": "cpp", + "concepts": "cpp", + "condition_variable": "cpp", + "csignal": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "forward_list": "cpp", + "list": "cpp", + "map": "cpp", + "set": "cpp", + "string": "cpp", + "unordered_map": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "format": "cpp", + "fstream": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "numbers": "cpp", + "ostream": "cpp", + "ranges": "cpp", + "semaphore": "cpp", + "span": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "text_encoding": "cpp", + "thread": "cpp", + "cinttypes": "cpp", + "typeinfo": "cpp", + "valarray": "cpp", + "variant": "cpp" + } } \ No newline at end of file diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 13af018..697e0f6 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -1244,7 +1244,7 @@ void printTerminalDevMap(const std::map& terminal_dev std::cout << "Key: " << key << ", Terminal ID: " << dev.terminal_id - << ", Terminal Code: " << dev.terminal_code + << ", Terminal Code: " << dev.terminal_name << ", Organization Name: "<< dev.org_name << ", Maintenance Name: " << dev.maint_name << ", Station Name: " << dev.station_name @@ -1258,6 +1258,9 @@ void printTerminalDevMap(const std::map& terminal_dev << ", Address: " << dev.addr_str << ", Port: " << dev.port << ", Timestamp: " << dev.timestamp + + << ", mac: " << dev.mac + << std::endl; // 鎵撳嵃鐩戞祴鐐逛俊鎭 @@ -1265,13 +1268,19 @@ void printTerminalDevMap(const std::map& terminal_dev const auto& m = dev.line[i]; std::cout << " Monitor [" << i << "] " << "ID: " << m.monitor_id - << ", Code: " << m.terminal_code + << ", Code: " << m.terminal_id << ", Name: " << m.monitor_name << ", Seq: " << m.logical_device_seq << ", Voltage: "<< m.voltage_level << ", Connect: "<< m.terminal_connect << ", Timestamp:"<< m.timestamp << ", Status: " << m.status + + << ", CT1: " << m.CT1 + << ", CT2: " << m.CT2 + << ", PT1: " << m.PT1 + << ", PT2: " << m.PT2 + << std::endl; } } @@ -1357,7 +1366,7 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml, }; work_terminal.terminal_id = get_value("id"); - work_terminal.terminal_code = get_value("terminalCode"); + work_terminal.terminal_name = get_value("terminalCode"); work_terminal.org_name = get_value("orgName"); work_terminal.maint_name = get_value("maintName"); work_terminal.station_name = get_value("stationName"); @@ -1371,6 +1380,8 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml, work_terminal.port = get_value("port"); work_terminal.timestamp = get_value("updateTime"); + work_terminal.mac = get_value("mac"); + for (tinyxml2::XMLElement* monitor = root->FirstChildElement("monitorData"); monitor; monitor = monitor->NextSiblingElement("monitorData")) { @@ -1381,9 +1392,18 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml, mon.terminal_connect = monitor->FirstChildElement("ptType") ? monitor->FirstChildElement("ptType")->GetText() : "N/A"; mon.logical_device_seq = monitor->FirstChildElement("lineNo") ? monitor->FirstChildElement("lineNo")->GetText() : "N/A"; mon.timestamp = monitor->FirstChildElement("timestamp") ? monitor->FirstChildElement("timestamp")->GetText() : "N/A"; - mon.terminal_code = monitor->FirstChildElement("terminal_code") ? monitor->FirstChildElement("terminal_code")->GetText() : "N/A"; + mon.terminal_id = monitor->FirstChildElement("terminal_id") ? monitor->FirstChildElement("terminal_name")->GetText() : "N/A"; mon.status = monitor->FirstChildElement("status") ? monitor->FirstChildElement("status")->GetText() : "N/A"; + mon.CT1 = monitor->FirstChildElement("CT1") && monitor->FirstChildElement("CT1")->GetText() + ? atof(monitor->FirstChildElement("CT1")->GetText()) : 0.0; + mon.CT2 = monitor->FirstChildElement("CT2") && monitor->FirstChildElement("CT2")->GetText() + ? atof(monitor->FirstChildElement("CT2")->GetText()) : 0.0; + mon.PT1 = monitor->FirstChildElement("PT1") && monitor->FirstChildElement("PT1")->GetText() + ? atof(monitor->FirstChildElement("PT1")->GetText()) : 0.0; + mon.PT2 = monitor->FirstChildElement("PT2") && monitor->FirstChildElement("PT2")->GetText() + ? atof(monitor->FirstChildElement("PT2")->GetText()) : 0.0; + work_terminal.line.push_back(mon); } @@ -1536,9 +1556,9 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d target_dev.terminal_id = update.terminal_id; std::cout << "terminal_id: " << target_dev.terminal_id << std::endl; } - if (!update.terminal_code.empty()) { - target_dev.terminal_code = update.terminal_code; - std::cout << "terminal_code: " << target_dev.terminal_code << std::endl; + if (!update.terminal_name.empty()) { + target_dev.terminal_name = update.terminal_name; + std::cout << "terminal_name: " << target_dev.terminal_name << std::endl; } if (!update.tmnl_factory.empty()) { target_dev.tmnl_factory = update.tmnl_factory; @@ -1574,6 +1594,11 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d std::cout << "port: " << target_dev.port << std::endl; } + if (!update.mac.empty()) { + target_dev.mac = update.mac; + std::cout << "mac: " << target_dev.mac << std::endl; + } + if (!update.timestamp.empty()) { struct tm timeinfo = {}; if (sscanf(update.timestamp.c_str(), "%4d-%2d-%2d %2d:%2d:%2d", @@ -1609,9 +1634,14 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d m.voltage_level = mon.voltage_level; m.terminal_connect = mon.terminal_connect; m.status = mon.status; - m.terminal_code = mon.terminal_code; + m.terminal_id = mon.terminal_id; m.timestamp = mon.timestamp; + m.CT1 = mon.CT1; + m.CT2 = mon.CT2; + m.PT1 = mon.PT1; + m.PT2 = mon.PT2; + if (m.terminal_connect != "0") { isdelta_flag = 1; std::cout << "monitor_id " << m.monitor_id << " uses delta wiring." << std::endl; @@ -2104,13 +2134,18 @@ void print_monitor(const ledger_monitor& mon) { auto safe = [](const std::string& s) { return s.empty() ? "N/A" : s; }; std::cout << "Monitor ID: " << safe(mon.monitor_id) << "\n"; - std::cout << "Terminal Code: " << safe(mon.terminal_code) << "\n"; + std::cout << "Terminal ID: " << safe(mon.terminal_id) << "\n"; std::cout << "Monitor Name: " << safe(mon.monitor_name) << "\n"; std::cout << "Logical Device Sequence: " << safe(mon.logical_device_seq) << "\n"; std::cout << "Voltage Level: " << safe(mon.voltage_level) << "\n"; std::cout << "Terminal Connect: " << safe(mon.terminal_connect) << "\n"; std::cout << "Timestamp: " << safe(mon.timestamp) << "\n"; std::cout << "Status: " << safe(mon.status) << "\n"; + + std::cout << "CT1: " << mon.CT1 << "\n"; + std::cout << "CT2: " << mon.CT2 << "\n"; + std::cout << "PT1: " << mon.PT1 << "\n"; + std::cout << "PT2: " << mon.PT2 << "\n"; } void print_terminal(const terminal_dev& tmnl) { @@ -2118,7 +2153,7 @@ void print_terminal(const terminal_dev& tmnl) { std::cout << "GUID: " << safe(tmnl.guid) << "\n"; std::cout << "Terminal ID: " << safe(tmnl.terminal_id) << "\n"; - std::cout << "Terminal Code: " << safe(tmnl.terminal_code)<< "\n"; + std::cout << "Terminal Code: " << safe(tmnl.terminal_name)<< "\n"; std::cout << "Organization Name: "<< safe(tmnl.org_name) << "\n"; std::cout << "Maintenance Name: " << safe(tmnl.maint_name) << "\n"; std::cout << "Station Name: " << safe(tmnl.station_name) << "\n"; @@ -2131,6 +2166,8 @@ void print_terminal(const terminal_dev& tmnl) { std::cout << "Port: " << safe(tmnl.port) << "\n"; std::cout << "Timestamp: " << safe(tmnl.timestamp) << "\n"; + std::cout << "mac: " << safe(tmnl.mac) << "\n"; + for (size_t i = 0; i < 10 && !tmnl.line[i].monitor_id.empty(); ++i) { std::cout << " Monitor " << (i + 1) << ":\n"; print_monitor(tmnl.line[i]); @@ -2168,7 +2205,7 @@ void print_trigger_update_xml(const trigger_update_xml_t& trigger_update) { } } -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////瑙f瀽鏄犲皠鏂囦欢 +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////瑙f瀽妯℃澘鏂囦欢 //瑙f瀽鏄犲皠鏂囦欢 bool ParseXMLConfig2(int xml_flag, XmlConfig *cfg, std::list *ctopiclist, const std::string& path) @@ -2615,3 +2652,79 @@ void Set_xml_nodeinfo() } +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////鏁版嵁杞崲鍑芥暟 +// DataArrayItem to_json +void to_json(nlohmann::json& j, const DataArrayItem& d) { + j = nlohmann::json{ + {"DataAttr", d.DataAttr}, + {"DataTimeSec", d.DataTimeSec}, + {"DataTimeUSec", d.DataTimeUSec}, + {"DataTag", d.DataTag}, + {"Data", d.Data} + }; +} + +// MsgObj to_json +void to_json(nlohmann::json& j, const MsgObj& m) { + j = nlohmann::json{ + {"Cldid", m.Cldid}, + {"DataType", m.DataType}, + {"DataAttr", m.DataAttr}, + {"DsNameIdx", m.DsNameIdx}, + {"DataArray", m.DataArray} + }; +} + +// FullObj to_json +void to_json(nlohmann::json& j, const FullObj& f) { + j = nlohmann::json{ + {"Mid", f.Mid}, + {"Did", f.Did}, + {"Pri", f.Pri}, + {"Type", f.Type}, + {"Msg", f.Msg} + }; +} +std::string generate_json( + int Mid, + int Did, + int Pri, + int Type, + int Cldid, + int DataType, + int DataAttr, + int DsNameIdx, + const std::vector& dataArray //鏋勯犲嚭array鍚庤皟鐢ㄨ繖涓嚱鏁 +) { + FullObj fobj; + fobj.Mid = Mid; + fobj.Did = Did; + fobj.Pri = Pri; + fobj.Type = Type; + fobj.Msg.Cldid = Cldid; + fobj.Msg.DataType = DataType; + fobj.Msg.DataAttr = DataAttr; + fobj.Msg.DsNameIdx = DsNameIdx; + fobj.Msg.DataArray = dataArray; + nlohmann::json j = fobj; + return j.dump(); // 杈撳嚭鏍囧噯 json 瀛楃涓 +} + +void upload_data_test(){ + std::vector arr; + arr.push_back({1, 1725477660, 0, 1, "xxxx"}); + arr.push_back({2, 1691741340, 0, 1, "yyyy"}); + + std::string js = generate_json( + -1, 2, 1, 4866, 1, 0, 2, 1, arr + ); + std::cout << js << std::endl; + + queue_data_t data; + data.monitor_no = 1; + data.strTopic = TOPIC_ALARM; + data.strText = js; + data.mp_id = "test"; + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); +} diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index d67b65d..54aa73e 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -133,7 +133,6 @@ void SendJsonAPI_web(const std::string& strUrl, //鎺ュ彛璺緞 } } - ////////////////////////////////////////////////////////////////////////////////////////////////////////涓婁紶鏂囦欢鎺ュ彛 //澶勭悊鏂囦欢涓婁紶鍝嶅簲 @@ -609,7 +608,7 @@ int terminal_ledger_web(std::map& terminal_dev_map, }; dev.terminal_id = safe_str(item, "id"); dev.addr_str = safe_str(item, "ip"); - dev.terminal_code = safe_str(item, "name"); + dev.terminal_name = safe_str(item, "name"); dev.org_name = safe_str(item, "org_name"); dev.maint_name = safe_str(item, "maint_name"); dev.station_name = safe_str(item, "stationName"); @@ -623,18 +622,26 @@ int terminal_ledger_web(std::map& terminal_dev_map, dev.processNo = safe_str(item, "processNo"); dev.maxProcessNum = safe_str(item, "maxProcessNum"); + dev.mac = safe_str(item, "mac");//娣诲姞mac + if (item.contains("monitorData") && item["monitorData"].is_array()) { for (auto& mon : item["monitorData"]) { if (dev.line.size() >= 10) break; ledger_monitor m; m.monitor_id = safe_str(mon, "id"); - m.terminal_code = safe_str(mon, "terminal_code"); + m.terminal_id = safe_str(mon, "terminal_id"); m.monitor_name = safe_str(mon, "name"); m.logical_device_seq = safe_str(mon, "lineNo"); m.voltage_level = safe_str(mon, "voltageLevel"); m.terminal_connect = safe_str(mon, "ptType"); m.timestamp = safe_str(mon, "updateTime"); m.status = safe_str(mon, "status"); + + m.CT1 = mon.value("CT1", 0.0); + m.CT2 = mon.value("CT2", 0.0); + m.PT1 = mon.value("PT1", 0.0); + m.PT2 = mon.value("PT2", 0.0); + dev.line.push_back(m); } } diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index a48a632..b19e18b 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -6,6 +6,11 @@ #include #include #include +#include + +/////////////////////////////////////////////////////////////////////////////////////////// + +#include "nlohmann/json.hpp" /////////////////////////////////////////////////////////////////////////////////////////// @@ -47,23 +52,28 @@ class ledger_monitor { public: std::string monitor_id; //鐩戞祴鐐筰d - std::string terminal_code; //鐩戞祴鐐 + std::string terminal_id; //鐩戞祴鐐 std::string monitor_name; //鐩戞祴鐐瑰悕 std::string logical_device_seq; //鐩戞祴鐐瑰簭鍙 std::string voltage_level; //鐩戞祴鐐圭數鍘嬬瓑绾 std::string terminal_connect; //鐩戞祴鐐规帴绾挎柟寮 std::string timestamp; //鏇存柊鏃堕棿 std::string status; //鐩戞祴鐐圭姸鎬 + + double PT1; // 鐢靛帇鍙樻瘮1 + double PT2; // 鐢靛帇鍙樻瘮2 + double CT1; // 鐢垫祦鍙樻瘮1 + double CT2; // 鐢垫祦鍙樻瘮2 }; //缁堢鍙拌处 class terminal_dev { public: - std::string guid; + std::string guid; //鍙拌处鏇存柊鍥炲鐢 std::string terminal_id; - std::string terminal_code; + std::string terminal_name; std::string org_name; std::string maint_name; std::string station_name; @@ -78,6 +88,8 @@ public: std::string processNo; std::string maxProcessNum; + std::string mac; // 瑁呯疆MAC鍦板潃 + std::vector line; }; @@ -314,9 +326,12 @@ bool is_blank(const std::string& str); void print_terminal(const terminal_dev& tmnl); void printTerminalDevMap(const std::map& terminal_dev_map); +void upload_data_test(); + ////////////////////////////////////////////////////////////////////////////////mq - +extern std::mutex queue_data_list_mutex; +extern std::list queue_data_list; /////////////////////////////////////////////////////////////////////////////////涓诲嚱鏁扮被澹版槑 @@ -353,6 +368,41 @@ typedef struct { pthread_mutex_t lock; // 绾跨▼涓撶敤浜掓枼閿 } thread_info_t; +///////////////////////////////////////////////////////////////////////////////////////涓婇佹暟鎹殑json鏍煎紡 +// 鍗曟潯 DataArray 鏁版嵁 +struct DataArrayItem { + int DataAttr; + int DataTimeSec; + int DataTimeUSec; + int DataTag; + std::string Data; +}; + +// Msg 瀵硅薄 +struct MsgObj { + int Cldid; + int DataType; + int DataAttr; + int DsNameIdx; + std::vector DataArray; +}; + +// 鏁翠綋 +struct FullObj { + int Mid; + int Did; + int Pri; + int Type; + MsgObj Msg; +}; + +// nlohmann搴忓垪鍖栨帴鍙 +void to_json(nlohmann::json& j, const DataArrayItem& d); +void to_json(nlohmann::json& j, const MsgObj& m); +void to_json(nlohmann::json& j, const FullObj& f); + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #endif diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index 0590696..f543257 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -218,11 +218,11 @@ std::string get_parent_directory() { //鍒濆鍖栨棩蹇 init_loggers(); - //璇诲彇妯″瀷,涓嬭浇鏂囦欢 + //璇诲彇妯″瀷,涓嬭浇妯℃澘鏂囦欢 parse_model_cfg_web(); - //瑙f瀽鏂囦欢 - Set_xml_nodeinfo(); + //瑙f瀽妯℃澘鏂囦欢 + //Set_xml_nodeinfo(); StartFrontThread(); //寮鍚富绾跨▼ diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 16da6b8..b6f238d 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -754,7 +754,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d terminal_dev json_data; json_data.terminal_id = item.value("id", ""); - json_data.terminal_code = item.value("name", ""); + json_data.terminal_name = item.value("name", ""); json_data.org_name = item.value("org_name", ""); json_data.maint_name = item.value("maint_name", ""); json_data.station_name = item.value("stationName", ""); @@ -783,7 +783,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d m.logical_device_seq = monitor_item.value("lineNo", ""); m.terminal_connect = monitor_item.value("ptType", ""); m.timestamp = json_data.timestamp; - m.terminal_code = json_data.terminal_code; + m.terminal_id = json_data.terminal_id; } } @@ -1180,7 +1180,7 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json xmlStream << "" << json_data.station_name << "" << std::endl; add_indent(xmlStream, indentLevel); - xmlStream << "" << json_data.terminal_code << "" << std::endl; + xmlStream << "" << json_data.terminal_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.timestamp << "" << std::endl; // Assuming `timestamp` @@ -1201,6 +1201,9 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.dev_key << "" << std::endl; + add_indent(xmlStream, indentLevel); + xmlStream << "" << json_data.mac << "" << std::endl; + // monitorData 閮ㄥ垎 for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) { const ledger_monitor& monitor = json_data.line[i]; @@ -1228,9 +1231,21 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json xmlStream << "" << monitor.timestamp << "" << std::endl; add_indent(xmlStream, indentLevel); - xmlStream << "" << monitor.terminal_code << "" << std::endl; + xmlStream << "" << monitor.terminal_id << "" << std::endl; add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.CT1 << "" << std::endl; + + add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.CT2 << "" << std::endl; + + add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.PT1 << "" << std::endl; + + add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.PT2 << "" << std::endl; + + add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.status << "" << std::endl; indentLevel--; diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 78f769d..5516939 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -281,6 +281,7 @@ extern bool normalOutputEnabled; if (G_TEST_NUM != 0) { std::cout << "[PeriodicTask] Executing rocketmq_test_300()\n"; rocketmq_test_300(G_TEST_NUM, g_front_seg_index, G_TEST_TYPE,m_front); + upload_data_test(); } } @@ -412,7 +413,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { std::ostringstream os; os << "\r\x1B[K------------------------------------\n"; os << "\r\x1B[K|-- terminal_id: " << dev.terminal_id << "\n"; - os << "\r\x1B[K|-- terminal_code: " << dev.terminal_code << "\n"; + os << "\r\x1B[K|-- terminal_name: " << dev.terminal_name << "\n"; os << "\r\x1B[K|-- dev_ip: " << dev.addr_str << "\n"; os << "\r\x1B[K|-- dev_port: " << dev.port << "\n"; os << "\r\x1B[K|-- dev_type: " << dev.dev_type << "\n"; @@ -427,6 +428,8 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { os << "\r\x1B[K|-- tmnl_status: " << dev.tmnl_status << "\n"; os << "\r\x1B[K|-- timestamp: " << dev.timestamp << "\n"; + os << "\r\x1B[K|-- mac: " << dev.mac << "\n"; + for (size_t i = 0; i < dev.line.size(); ++i) { const auto& ld = dev.line[i]; if (ld.monitor_id.empty()) continue; @@ -434,11 +437,17 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { os << "\r\x1B[K |-- monitor_id: " << ld.monitor_id << "\n"; os << "\r\x1B[K |-- monitor_name: " << ld.monitor_name << "\n"; os << "\r\x1B[K |-- logical_device_seq: " << ld.logical_device_seq << "\n"; - os << "\r\x1B[K |-- terminal_code: " << ld.terminal_code << "\n"; + os << "\r\x1B[K |-- terminal_id: " << ld.terminal_id << "\n"; os << "\r\x1B[K |-- voltage_level: " << ld.voltage_level << "\n"; os << "\r\x1B[K |-- terminal_connect: " << ld.terminal_connect << "\n"; os << "\r\x1B[K |-- status: " << ld.status << "\n"; os << "\r\x1B[K |-- timestamp: " << ld.timestamp << "\n"; + + os << "\r\x1B[K |-- CT1: " << ld.CT1 << "\n"; + os << "\r\x1B[K |-- CT2: " << ld.CT2 << "\n"; + os << "\r\x1B[K |-- PT1: " << ld.PT1 << "\n"; + os << "\r\x1B[K |-- PT2: " << ld.PT2 << "\n"; + } os << "\r\x1B[K------------------------------------\n"; From 747d6c757c5c7ccff7eea09ebd920ed811d9fb26 Mon Sep 17 00:00:00 2001 From: lnk Date: Thu, 26 Jun 2025 16:44:21 +0800 Subject: [PATCH 3/4] generate ledger to main thread --- LFtid1056/cloudfront/code/cfg_parser.cpp | 59 ++++++++++++---- LFtid1056/cloudfront/code/interface.h | 9 +++ LFtid1056/cloudfront/code/log4.cpp | 6 -- LFtid1056/cloudfront/code/main.cpp | 4 -- LFtid1056/cloudfront/code/rocketmq.cpp | 87 +++++++++--------------- LFtid1056/cloudfront/code/worker.cpp | 7 +- LFtid1056/main_thread.cpp | 2 + 7 files changed, 93 insertions(+), 81 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 697e0f6..5a9365b 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -31,6 +31,8 @@ #include "tinyxml2.h" #include "rocketmq.h" + + ///////////////////////////////////////////////////////////////////////////////////////////////// using namespace std; @@ -58,8 +60,6 @@ extern std::list queue_data_list; //queue鍙戦佹暟鎹摼琛 extern int three_secs_enabled; -extern std::vector terminal_devlist; - extern std::map xmlinfo_list;//淇濆瓨鎵鏈夊瀷鍙峰搴旂殑icd鏄犲皠鏂囦欢瑙f瀽鏁版嵁 extern XmlConfig xmlcfg;//鏄熷舰鎺ョ嚎xml鑺傜偣瑙f瀽鐨勬暟鎹-榛樿鏄犲皠鏂囦欢瑙f瀽鏁版嵁 extern std::list topicList; //闃熷垪鍙戦佷富棰橀摼琛 @@ -2686,15 +2686,15 @@ void to_json(nlohmann::json& j, const FullObj& f) { }; } std::string generate_json( - int Mid, - int Did, - int Pri, - int Type, - int Cldid, - int DataType, - int DataAttr, - int DsNameIdx, - const std::vector& dataArray //鏋勯犲嚭array鍚庤皟鐢ㄨ繖涓嚱鏁 + int Mid, //闇搴旂瓟鐨勬姤鏂囪闃呰呮敹鍒板悗闇浠ユID搴旂瓟锛屾棤闇搴旂瓟濉叆鈥-1鈥 + int Did, //璁惧鍞竴鏍囪瘑Ldid锛屽~鍏0浠h〃Ndid銆 + int Pri, //鎶ユ枃澶勭悊鐨勪紭鍏堢骇 + int Type, //娑堟伅绫诲瀷 + int Cldid, //閫昏緫瀛愯澶嘔D锛0-閫昏緫璁惧鏈韩锛屾棤濉-1 + int DataType, //鏁版嵁绫诲瀷锛0-琛ㄧず浠ユ暟鎹泦鏂瑰紡涓婇 + int DataAttr, //鏁版嵁灞炴э細鏃犫0鈥濄佸疄鏃垛1鈥濄佺粺璁♀2鈥濈瓑銆 + int DsNameIdx, //鏁版嵁闆嗗簭鍙凤紙浠ユ暟鎹泦鏂瑰紡涓婇侊級锛屾棤濉-1 + const std::vector& dataArray //鏁版嵁鏁扮粍銆 ) { FullObj fobj; fobj.Mid = Mid; @@ -2712,7 +2712,11 @@ std::string generate_json( void upload_data_test(){ std::vector arr; - arr.push_back({1, 1725477660, 0, 1, "xxxx"}); + arr.push_back({1, 1725477660, 0, 1, "xxxx"}); //鏁版嵁灞炴 -1-鏃狅紝 0-鈥淩t鈥,1-鈥淢ax鈥,2-鈥淢in鈥,3-鈥淎vg鈥,4-鈥淐p95鈥 + //鏁版嵁鏃舵爣锛岀浉瀵1970骞寸殑绉掞紝鏃犳晥濉叆鈥-1鈥 + //鏁版嵁鏃舵爣锛屽井绉掗挓锛屾棤鏁堝~鍏モ-1鈥 + //鏁版嵁鏍囪瘑锛1-鏍囪瘑鏁版嵁寮傚父 + //鏁版嵁搴忓垪锛堟暟鎹泦涓婇佹椂灏嗕簩杩涘埗鏁版嵁娴佽浆鎹㈡垚Base64瀛楃涓诧紝鍏朵粬鏁版嵁涓簅bject锛 arr.push_back({2, 1691741340, 0, 1, "yyyy"}); std::string js = generate_json( @@ -2728,3 +2732,34 @@ void upload_data_test(){ std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } + +//////////////////////////////////////////////////////////////////////////////////////////////////鍙拌处璧嬪肩粰閫氫俊 + +std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist) { + std::vector devices; + + for (const auto& terminal : terminal_devlist) { + DeviceInfo device; + device.device_id = terminal.terminal_id; + device.name = terminal.terminal_name; + device.model = terminal.dev_series; + device.mac = terminal.mac; + + for (const auto& monitor : terminal.line) { + PointInfo point; + point.point_id = monitor.monitor_id; + point.name = monitor.monitor_name; + point.device_id = terminal.terminal_id; + point.PT1 = monitor.PT1; + point.PT2 = monitor.PT2; + point.CT1 = monitor.CT1; + point.CT2 = monitor.CT2; + + device.points.push_back(point); + } + + devices.push_back(device); + } + + return devices; +} \ No newline at end of file diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index b19e18b..5e564f5 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -12,6 +12,8 @@ #include "nlohmann/json.hpp" +#include "../../client2.h" + /////////////////////////////////////////////////////////////////////////////////////////// class Front; @@ -308,6 +310,9 @@ int parse_model_cfg_web(); void qvvr_test(); void Fileupload_test(); +extern std::vector terminal_devlist; +extern std::mutex ledgermtx; + //////////////////////////////////////////////////////////////////////////////////cfg_parse鐨勫嚱鏁板0鏄 void init_config(); @@ -403,6 +408,10 @@ void to_json(nlohmann::json& j, const FullObj& f); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist); + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #endif diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index bffb006..cbfc1a0 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -47,15 +47,9 @@ extern int g_front_seg_index; extern std::string FRONT_INST; extern std::string subdir; -//mq -extern std::mutex queue_data_list_mutex; //queue鍙戦佹暟鎹攣 -extern std::list queue_data_list; //queue鍙戦佹暟鎹摼琛 - //鏃ュ織涓婚 extern std::string G_LOG_TOPIC; -extern std::vector terminal_devlist; - ////////////////////////////////////////////////////////杈呭姪鍑芥暟 std::string get_front_type_from_subdir() { if (subdir == "cfg_3s_data") diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index f543257..c71546b 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -78,10 +78,6 @@ extern int TEST_PORT; //娴嬭瘯绔彛鍙 extern std::string FRONT_INST; -extern std::mutex queue_data_list_mutex; -extern std::list queue_data_list; - - /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 鍔熻兘鍑芥暟 template diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index b6f238d..b1c23f4 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -58,10 +58,6 @@ static rocketmq::RocketMQProducer* g_producer = nullptr; //鐢熶骇鑰 /////////////////////////////////////////////////////////////////////////////////////////////////////////// -//鍙拌处 -extern std::mutex ledgermtx; -extern std::vector terminal_devlist; - //鍓嶇疆杩涚▼ extern unsigned int g_node_id; extern int g_front_seg_index; @@ -1391,9 +1387,8 @@ bool shouldSkipTerminal(const std::string& terminal_id) { return false; } -void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { - - if(!INITFLAG){ +void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { + if (!INITFLAG) { std::cout << "鍓嶇疆鏈垵濮嬪寲瀹屾垚\n"; return; } @@ -1451,41 +1446,34 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { data.mp_id = dev.line[j].monitor_id; data.monitor_no = static_cast(i + j); - std::string modified_time = std::to_string(current_time_ms); + std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; - - // 鏇挎崲 Monitor - size_t monitor_pos = modified_strText.find("\"Monitor\""); - if (monitor_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", monitor_pos); - size_t quote_pos = modified_strText.find("\"", colon_pos); - size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id); - } - } - - // 鏇挎崲 TIME - size_t time_pos = modified_strText.find("\"TIME\""); - if (time_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", time_pos); - size_t quote_pos = colon_pos; - size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); + try { + auto j = nlohmann::json::parse(modified_strText); + j["Did"] = i; + if (j.contains("Msg") && j["Msg"].is_object()) { + j["Msg"]["Cldid"] = j; + if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { + for (auto& item : j["Msg"]["DataArray"]) { + if (item.is_object()) { + item["DataTimeSec"] = std::stoll(modified_time); + } + } + } } + modified_strText = j.dump(); + } catch (...) { + // 淇濇寔鍘熷鏂囨湰 } data.strText = modified_strText; - //my_rocketmq_send(data,front->m_producer); std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; - } } } else { @@ -1493,43 +1481,36 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) { std::string monitor_id = "testmonitor" + std::to_string(i); - data.mp_id = monitor_id; data.monitor_no = i; - std::string modified_time = std::to_string(current_time_ms); + std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; - // 鏇挎崲 Monitor - size_t monitor_pos = modified_strText.find("\"Monitor\""); - if (monitor_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", monitor_pos); - size_t quote_pos = modified_strText.find("\"", colon_pos); - size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id); - } - } - - // 鏇挎崲 TIME - size_t time_pos = modified_strText.find("\"TIME\""); - if (time_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", time_pos); - size_t quote_pos = colon_pos; - size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); + try { + auto j = nlohmann::json::parse(modified_strText); + j["Did"] = 0; + if (j.contains("Msg") && j["Msg"].is_object()) { + j["Msg"]["Cldid"] = data.mp_id; + if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { + for (auto& item : j["Msg"]["DataArray"]) { + if (item.is_object()) { + item["DataTimeSec"] = std::stoll(modified_time); + } + } + } } + modified_strText = j.dump(); + } catch (...) { + // 淇濇寔鍘熷鏂囨湰 } data.strText = modified_strText; - //my_rocketmq_send(data,front->m_producer); std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; - } } diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 5516939..709147b 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -48,11 +48,6 @@ bool showinshellflag =false; extern std::list errorList, warnList, normalList; extern std::mutex errorListMutex, warnListMutex, normalListMutex; -extern std::vector terminal_devlist; -extern std::mutex ledgermtx; - -extern std::list queue_data_list; - extern int IED_COUNT; extern int INITFLAG; extern int g_front_seg_index; @@ -281,7 +276,7 @@ extern bool normalOutputEnabled; if (G_TEST_NUM != 0) { std::cout << "[PeriodicTask] Executing rocketmq_test_300()\n"; rocketmq_test_300(G_TEST_NUM, g_front_seg_index, G_TEST_TYPE,m_front); - upload_data_test(); + //upload_data_test(); } } diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 1a4a966..3171a60 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -152,6 +152,8 @@ void* client_manager_thread(void* arg) { // 生成100个测试装置 std::vector test_devices = generate_test_devices(100); + //std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk添加 + // 启动客户端连接 start_client_connect(devices); From cc909101dffbb47140e97dd248256bd9fe3094ef Mon Sep 17 00:00:00 2001 From: lnk Date: Fri, 27 Jun 2025 16:33:41 +0800 Subject: [PATCH 4/4] multi front process --- LFtid1056/client2.h | 7 ++- LFtid1056/cloudfront/boot/start_fe.sh | 2 +- LFtid1056/cloudfront/code/cfg_parser.cpp | 46 +++++++++-------- LFtid1056/cloudfront/code/interface.cpp | 22 +++++---- LFtid1056/cloudfront/code/interface.h | 9 ++-- LFtid1056/cloudfront/code/log4.cpp | 6 +-- LFtid1056/cloudfront/code/log4.h | 2 +- LFtid1056/cloudfront/code/log4cplus/log4.h | 2 +- LFtid1056/cloudfront/code/main.cpp | 26 +++++----- LFtid1056/cloudfront/code/rocketmq.cpp | 57 +++++++++++----------- LFtid1056/main_thread.cpp | 16 ++++-- 11 files changed, 105 insertions(+), 90 deletions(-) diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index 122a91d..f04d033 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -1,3 +1,6 @@ +#ifndef CLIENT_H +#define CLIENT_H + #include #include #include @@ -101,4 +104,6 @@ void try_reconnect(uv_timer_t* timer); void on_connect(uv_connect_t* req, int status); void on_close(uv_handle_t* handle); void init_clients(uv_loop_t* loop, const std::vector& devices); -void stop_all_clients(); \ No newline at end of file +void stop_all_clients(); + +#endif \ No newline at end of file diff --git a/LFtid1056/cloudfront/boot/start_fe.sh b/LFtid1056/cloudfront/boot/start_fe.sh index bf19973..3d3f1f5 100644 --- a/LFtid1056/cloudfront/boot/start_fe.sh +++ b/LFtid1056/cloudfront/boot/start_fe.sh @@ -11,7 +11,7 @@ QTDIR=/qt-4.8.4 export QTDIR -FEP_ENV=/FeProject +FEP_ENV=/home/pq/zwproject/LFtid1056 export FEP_ENV PATH=$FEP_ENV/bin:$QTDIR/bin:$PATH diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 5a9365b..94475e6 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -527,7 +527,7 @@ void init_config() { } //娴嬭瘯杩涚▼绔彛 - if (g_node_id == STAT_DATA_BASE_NODE_ID)//缁熻閲囬泦 + /*if (g_node_id == STAT_DATA_BASE_NODE_ID)//缁熻閲囬泦 TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index; else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//琛ュ彫 TEST_PORT = TEST_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index; @@ -537,8 +537,8 @@ void init_config() { } else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//鏆傛佸綍娉 TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index; - } - + }*/ + TEST_PORT = TEST_PORT + g_front_seg_index; } ////////////////////////////////////////////////////////////////////////////////////////////鑾峰彇褰撳墠鏃堕棿 @@ -817,7 +817,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) { DIR* dir = opendir(cfg_dir.c_str()); if (!dir) { - DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋 鏃犳硶瑙f瀽琛ユ嫑鏂囦欢锛岃ˉ鎷涙枃浠惰矾寰凢RONT_PATH + /etc/recall/涓嶅瓨鍦", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%d鍙疯繘绋 鏃犳硶瑙f瀽琛ユ嫑鏂囦欢锛岃ˉ鎷涙枃浠惰矾寰凢RONT_PATH + /etc/recall/涓嶅瓨鍦", g_front_seg_index); return false; } @@ -829,7 +829,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) { std::string filepath = cfg_dir + "/" + filename; tinyxml2::XMLDocument doc; if (doc.LoadFile(filepath.c_str()) != tinyxml2::XML_SUCCESS) { - DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋 鏃犳硶瑙f瀽琛ユ嫑鏂囦欢%s,琛ユ嫑鍐呭鏃犳晥", get_front_msg_from_subdir(), g_front_seg_index, filepath.c_str()); + DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%d鍙疯繘绋 鏃犳硶瑙f瀽琛ユ嫑鏂囦欢%s,琛ユ嫑鍐呭鏃犳晥", g_front_seg_index, filepath.c_str()); continue; } @@ -871,20 +871,18 @@ void process_recall_config(recall_xml_t* recall_xml) //鏍规嵁鐩戞祴鐐筰d鏉ヨ幏鍙栬ˉ鎷涙暟鎹紝琛ユ嫑鏃惰皟鐢ㄨ繖涓 void Check_Recall_Config(const std::string& id) { - if (g_node_id == HIS_DATA_BASE_NODE_ID || + /*if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || - g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) { + g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {*/ - recall_xml_t recall_xml; - std::memset(&recall_xml, 0, sizeof(recall_xml_t)); - - // 瑙f瀽琛ユ嫑鏂囦欢 - parse_recall_xml(&recall_xml, id); - - // 灏嗚ˉ鎷涙暟鎹祴鍊煎埌鍏ㄥ眬鍙橀噺 - process_recall_config(&recall_xml); - } + recall_xml_t recall_xml; + std::memset(&recall_xml, 0, sizeof(recall_xml_t)); + // 瑙f瀽琛ユ嫑鏂囦欢 + parse_recall_xml(&recall_xml, id); + // 灏嗚ˉ鎷涙暟鎹祴鍊煎埌鍏ㄥ眬鍙橀噺 + process_recall_config(&recall_xml); + //} } //琛ユ嫑鎴愬姛鍚庡垹闄よˉ鎷涙枃浠讹紝琛ユ嫑鍚庤皟鐢ㄨ繖涓 @@ -925,7 +923,7 @@ void DeletcRecallXml() { DIR* dir = opendir(cfg_dir.c_str()); if (!dir) { std::cerr << "folder does not exist!" << std::endl; - DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋 鍒犻櫎鏃х殑琛ユ嫑鏂囦欢澶辫触,琛ユ嫑鏂囦欢璺緞FRONT_PATH + /etc/recall/涓嶅瓨鍦",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%d鍙疯繘绋 鍒犻櫎鏃х殑琛ユ嫑鏂囦欢澶辫触,琛ユ嫑鏂囦欢璺緞FRONT_PATH + /etc/recall/涓嶅瓨鍦", g_front_seg_index); return; } @@ -944,7 +942,7 @@ void DeletcRecallXml() { if (stat(fullpath.c_str(), &file_stat) == 0) { if (file_stat.st_mtime < cutoff) { if (remove(fullpath.c_str()) == 0) { - DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%s%d鍙疯繘绋 鍒犻櫎瓒呰繃涓ゅぉ鐨勮ˉ鎷涙枃浠",get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%d鍙疯繘绋 鍒犻櫎瓒呰繃涓ゅぉ鐨勮ˉ鎷涙枃浠", g_front_seg_index); } else { std::cerr << "Failed to remove file: " << fullpath << std::endl; } @@ -966,7 +964,7 @@ void CreateRecallXml() { g_StatisticLackList_list_mutex.lock(); if (!g_StatisticLackList.empty()) { - DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%s%d鍙疯繘绋 寮濮嬪啓鍏ヨˉ鎷涙枃浠", get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%d鍙疯繘绋 寮濮嬪啓鍏ヨˉ鎷涙枃浠", g_front_seg_index); std::map> id_map; for (const auto& jr : g_StatisticLackList) { @@ -1006,7 +1004,7 @@ void CreateRecallXml() { tinyxml2::XMLError save_result = doc.SaveFile(path.str().c_str()); if (save_result != tinyxml2::XML_SUCCESS) { - DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋 鏃犳硶灏嗚ˉ鎷涙枃浠跺啓鍏ヨ矾寰: %s",get_front_msg_from_subdir(), g_front_seg_index, path.str().c_str()); + DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%d鍙疯繘绋 鏃犳硶灏嗚ˉ鎷涙枃浠跺啓鍏ヨ矾寰: %s", g_front_seg_index, path.str().c_str()); continue; } } @@ -1019,10 +1017,10 @@ void CreateRecallXml() { //鐢熸垚寰呰ˉ鎷泋ml鏂囦欢 void create_recall_xml() { - if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) { - DeletcRecallXml(); - CreateRecallXml(); - } + //if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) { + DeletcRecallXml(); + CreateRecallXml(); + //} } // 宸ュ叿鍑芥暟锛氬皢鏃堕棿瀛楃涓茶浆涓 time_t锛堢绾э級 diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index 54aa73e..edac700 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -534,7 +534,7 @@ int terminal_ledger_web(std::map& terminal_dev_map, { if (inputstring.empty()) { std::cerr << "Error: inputstring is empty\n"; - DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋嬭皟鐢╳eb鍙拌处鎺ュ彛鐨勫叆鍙備负绌", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%d鍙疯繘绋嬭皟鐢╳eb鍙拌处鎺ュ彛鐨勫叆鍙備负绌", g_front_seg_index); return 1; } @@ -672,7 +672,8 @@ int terminal_ledger_web(std::map& terminal_dev_map, } // 5. 涓昏繘绋嬩繚瀛樺彴璐 - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { save_ledger_json(responseStr); } @@ -691,7 +692,7 @@ int parse_device_cfg_web() input_jstr += "}"; std::cout << "input_jstr: " << input_jstr << std::endl; - DIY_DEBUGLOG("process","銆怐EBUG銆戝墠缃殑%s%d鍙疯繘绋嬭皟鐢╳eb鎺ュ彛鑾峰彇鍙拌处浣跨敤鐨勮姹傝緭鍏ヤ负:%s",get_front_msg_from_subdir(), g_front_seg_index, input_jstr.c_str()); + DIY_DEBUGLOG("process","銆怐EBUG銆戝墠缃殑%d鍙疯繘绋嬭皟鐢╳eb鎺ュ彛鑾峰彇鍙拌处浣跨敤鐨勮姹傝緭鍏ヤ负:%s", g_front_seg_index, input_jstr.c_str()); // 2. 璋冪敤鎺ュ彛 std::map terminal_dev_map; @@ -702,8 +703,9 @@ int parse_device_cfg_web() // 3. 璋冭瘯鎵撳嵃 printTerminalDevMap(terminal_dev_map); - // 4. 鐪嬮棬鐙楅厤缃牎楠岋紙浠呬富杩涚▼绋虫侊級 - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + // 4. 鐪嬮棬鐙楅厤缃牎楠岋紙浠呬富杩涚▼锛 + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { int max_index = get_max_stat_data_index(FRONT_PATH + "/etc/runtime.cf"); std::cout << "max_index = " << max_index << std::endl; @@ -730,13 +732,13 @@ int parse_device_cfg_web() // 5. 鍙拌处鏁伴噺涓庨厤缃瘮瀵 int count_cfg = static_cast(terminal_dev_map.size()); std::cout << "terminal_ledger_num: " << count_cfg << std::endl; - DIY_DEBUGLOG("process", "銆怐EBUG銆戝墠缃殑%s%d鍙疯繘绋嬭皟鐢ㄨ幏鍙栧埌鐨勫彴璐︾殑鏁伴噺涓:%d",get_front_msg_from_subdir(), g_front_seg_index, count_cfg); + DIY_DEBUGLOG("process", "銆怐EBUG銆戝墠缃殑%d鍙疯繘绋嬭皟鐢ㄨ幏鍙栧埌鐨勫彴璐︾殑鏁伴噺涓:%d", g_front_seg_index, count_cfg); if (IED_COUNT < count_cfg) { std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl; - DIY_WARNLOG("process","銆怶ARN銆戝墠缃殑%s%d鍙疯繘绋嬭幏鍙栧埌鐨勫彴璐︾殑鏁伴噺澶т簬閰嶇疆鏂囦欢涓粰鍗曚釜杩涚▼閰嶇疆鐨勫彴璐︽暟閲:%d,杩欎釜杩涚▼灏嗘寜鐓ц幏鍙栧埌鐨勫彴璐︾殑鏁伴噺鏉ュ垱寤哄彴璐︾┖闂达紝杩欎釜杩涚▼涓嶈兘鐩存帴閫氳繃鍙拌处娣诲姞鏉ユ柊澧炲彴璐︼紝鍙兘閫氳繃閲嶅惎杩涚▼鎴栬呭厛鍒犻櫎宸叉湁鍙拌处鍐嶆坊鍔犲彴璐︾殑鏂瑰紡鏉ユ坊鍔犳柊鍙拌处",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); + DIY_WARNLOG("process","銆怶ARN銆戝墠缃殑%d鍙疯繘绋嬭幏鍙栧埌鐨勫彴璐︾殑鏁伴噺澶т簬閰嶇疆鏂囦欢涓粰鍗曚釜杩涚▼閰嶇疆鐨勫彴璐︽暟閲:%d,杩欎釜杩涚▼灏嗘寜鐓ц幏鍙栧埌鐨勫彴璐︾殑鏁伴噺鏉ュ垱寤哄彴璐︾┖闂达紝杩欎釜杩涚▼涓嶈兘鐩存帴閫氳繃鍙拌处娣诲姞鏉ユ柊澧炲彴璐︼紝鍙兘閫氳繃閲嶅惎杩涚▼鎴栬呭厛鍒犻櫎宸叉湁鍙拌处鍐嶆坊鍔犲彴璐︾殑鏂瑰紡鏉ユ坊鍔犳柊鍙拌处", g_front_seg_index, IED_COUNT); } else { - DIY_INFOLOG("process","銆怤ORMAL銆戝墠缃殑%s%d鍙疯繘绋嬫牴鎹厤缃枃浠朵腑缁欏崟涓繘绋嬮厤缃殑鍙拌处鏁伴噺:%d鏉ュ垱寤哄彴璐︾┖闂",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); + DIY_INFOLOG("process","銆怤ORMAL銆戝墠缃殑%d鍙疯繘绋嬫牴鎹厤缃枃浠朵腑缁欏崟涓繘绋嬮厤缃殑鍙拌处鏁伴噺:%d鏉ュ垱寤哄彴璐︾┖闂", g_front_seg_index, IED_COUNT); } ///////////////////////////////////////////////////////////////////////////////鐢ㄤ緥杩欓噷灏嗗眬閮ㄧ殑map鎷疯礉鍒板叏灞map锛屽悗缁牴鎹崗璁彴璐︿慨鏀 @@ -883,7 +885,7 @@ int parse_model_cfg_web() // 3. 璋冪敤鎺ュ彛 std::map icd_model_map; if (parse_model_web(&icd_model_map, input_jstr)) { - DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋 icd妯″瀷鎺ュ彛寮傚父,灏嗕娇鐢ㄩ粯璁ょ殑icd妯″瀷,璇锋鏌ユ帴鍙i厤缃",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%d鍙疯繘绋 icd妯″瀷鎺ュ彛寮傚父,灏嗕娇鐢ㄩ粯璁ょ殑icd妯″瀷,璇锋鏌ユ帴鍙i厤缃", g_front_seg_index); // 纭繚閲婃斁 map for (auto& kv : icd_model_map) delete kv.second; return 0; @@ -937,7 +939,7 @@ std::string parse_model_cfg_web_one(const std::string& terminal_type) // 2. 鎷夊彇骞惰В鏋 if (parse_model_web(&icd_model_map, input_jstr) != 0) { std::cerr << "parse_model_web failed for type: " << terminal_type << std::endl; - DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋 icd妯″瀷鎺ュ彛寮傚父,灏嗕娇鐢ㄩ粯璁ょ殑icd妯″瀷,璇锋鏌ユ帴鍙i厤缃",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%d鍙疯繘绋 icd妯″瀷鎺ュ彛寮傚父,灏嗕娇鐢ㄩ粯璁ょ殑icd妯″瀷,璇锋鏌ユ帴鍙i厤缃", g_front_seg_index); // 娓呯悊锛堝嵆浣 map 涓虹┖锛屼篃瀹夊叏锛 for (auto& kv : icd_model_map) delete kv.second; return ""; diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 5e564f5..9f74f13 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -20,13 +20,13 @@ class Front; /////////////////////////////////////////////////////////////////////////////////////////// -#define STAT_DATA_BASE_NODE_ID 100 +/*#define STAT_DATA_BASE_NODE_ID 100 #define THREE_SECS_DATA_BASE_NODE_ID 200 #define SOE_COMTRADE_BASE_NODE_ID 300 #define HIS_DATA_BASE_NODE_ID 400 #define NEW_HIS_DATA_BASE_NODE_ID 500 #define RECALL_HIS_DATA_BASE_NODE_ID 600 -#define RECALL_ALL_DATA_BASE_NODE_ID 700 +#define RECALL_ALL_DATA_BASE_NODE_ID 700*/ /////////////////////////////////////////////////////////////////////////////////////////// @@ -340,11 +340,14 @@ extern std::list queue_data_list; /////////////////////////////////////////////////////////////////////////////////涓诲嚱鏁扮被澹版槑 -std::string get_front_msg_from_subdir(); +//std::string get_front_msg_from_subdir(); extern std::string FRONT_PATH; +extern int g_front_seg_index; +extern int g_front_seg_num; void* cloudfrontthread(void* arg); +bool parse_param(int argc, char* argv[]); struct ThreadArgs { int argc; diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index cbfc1a0..0d4ed55 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -51,7 +51,7 @@ extern std::string subdir; extern std::string G_LOG_TOPIC; ////////////////////////////////////////////////////////杈呭姪鍑芥暟 -std::string get_front_type_from_subdir() { +/*std::string get_front_type_from_subdir() { if (subdir == "cfg_3s_data") return "realTime"; else if (subdir == "cfg_soe_comtrade") @@ -62,7 +62,7 @@ std::string get_front_type_from_subdir() { return "stat"; else return "unknown"; -} +}*/ // 閫掑綊鍒涘缓鐩綍 bool create_directory_recursive(const std::string& path) { @@ -153,7 +153,7 @@ protected: << "\",\"level\":\"" << level_str << "\",\"grade\":\"" << get_level_str(level) << "\",\"logtype\":\"" << (logtype == LOGTYPE_COM ? "com" : "data") - << "\",\"frontType\":\"" << get_front_type_from_subdir() + << "\",\"frontType\":\"" << "cloudfront" << "\",\"log\":\"" << escape_json(msg) << "\"}"; std::string jsonString = oss.str(); diff --git a/LFtid1056/cloudfront/code/log4.h b/LFtid1056/cloudfront/code/log4.h index 89b4d6a..ecab730 100644 --- a/LFtid1056/cloudfront/code/log4.h +++ b/LFtid1056/cloudfront/code/log4.h @@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch; extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); -std::string get_front_type_from_subdir(); +//std::string get_front_type_from_subdir(); // 涓嶅甫 Appender 鐨勭増鏈 diff --git a/LFtid1056/cloudfront/code/log4cplus/log4.h b/LFtid1056/cloudfront/code/log4cplus/log4.h index 89b4d6a..ecab730 100644 --- a/LFtid1056/cloudfront/code/log4cplus/log4.h +++ b/LFtid1056/cloudfront/code/log4cplus/log4.h @@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch; extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); -std::string get_front_type_from_subdir(); +//std::string get_front_type_from_subdir(); // 涓嶅甫 Appender 鐨勭増鏈 diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index c71546b..b75a46d 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -52,7 +52,7 @@ std::string FRONT_PATH; int INITFLAG = 0; //鍓嶇疆鏍囩疆 -std::string subdir = "cfg_stat_data"; //榛樿绋虫 +std::string subdir = "cloudfrontproc"; //瀛愮洰褰 uint32_t g_node_id = 0; int g_front_seg_index = 0; //榛樿鍗曡繘绋 int g_front_seg_num = 0; //榛樿鍗曡繘绋 @@ -139,7 +139,7 @@ bool parse_param(int argc, char* argv[]) { } //鑾峰彇鍓嶇疆绫诲瀷 -void init_global_function_enable() { +/*void init_global_function_enable() { if (subdir == "cfg_stat_data") { // 鍘嗗彶绋虫 g_node_id = STAT_DATA_BASE_NODE_ID; auto_register_report_enabled = 1; @@ -151,10 +151,10 @@ void init_global_function_enable() { } else if (subdir == "cfg_recallhis_data") { // 琛ユ嫑 g_node_id = RECALL_HIS_DATA_BASE_NODE_ID; } -} +}*/ //鑾峰彇鍔熻兘鍚嶇О -std::string get_front_msg_from_subdir() { +/*std::string get_front_msg_from_subdir() { if (subdir.find("cfg_3s_data") != std::string::npos) return "瀹炴椂鏁版嵁杩涚▼"; else if (subdir.find("cfg_soe_comtrade") != std::string::npos) @@ -165,7 +165,7 @@ std::string get_front_msg_from_subdir() { return "绋虫佺粺璁¤繘绋"; else return "unknown"; -} +}*/ //鑾峰彇鍓嶇疆璺緞 std::string get_parent_directory() { @@ -199,14 +199,14 @@ std::string get_parent_directory() { { //鍒濆鍖杇_node_id - init_global_function_enable(); + //init_global_function_enable(); //閰嶇疆鍒濆鍖 init_config(); //鍚姩杩涚▼鏃ュ織 init_logger_process(); - DIY_WARNLOG("process","銆怶ARN銆戝墠缃殑%s%d鍙疯繘绋 杩涚▼绾ф棩蹇楀垵濮嬪寲瀹屾瘯", get_front_msg_from_subdir(), g_front_seg_index); + DIY_WARNLOG("process","銆怶ARN銆戝墠缃殑%d鍙疯繘绋 杩涚▼绾ф棩蹇楀垵濮嬪寲瀹屾瘯", g_front_seg_index); //璇诲彇鍙拌处 parse_device_cfg_web(); @@ -384,13 +384,13 @@ void Front::mqconsumerThread() std::string nameServer = G_MQCONSUMER_IPPORT; std::vector subscriptions; - if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { - subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata); - } + //if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { + subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata); + //} subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate); - if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { - subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall); - } + //if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { + subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall); + //} subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog); diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index b1c23f4..a23440b 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -278,7 +278,7 @@ void rocketmq_producer_send(rocketmq::RocketMQProducer* producer, producer->sendMessage(body, topic, tags, keys); } catch (const std::exception& e) { std::cerr << "[rocketmq_producer_send] 鍙戦佸け璐: " << e.what() << std::endl; - DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋 MQ鍙戦佸け璐", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%d鍙疯繘绋 MQ鍙戦佸け璐", g_front_seg_index); } } @@ -540,7 +540,7 @@ bool parseJsonMessageSET(const std::string& json_str) { std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl; - DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%s%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮繘绋嬫帶鍒舵秷鎭",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮繘绋嬫帶鍒舵秷鎭", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); if (code_str == "set_process") { if (!messageBody.contains("processNum")) { @@ -559,13 +559,14 @@ bool parseJsonMessageSET(const std::string& json_str) { // 鏍¢獙鍙傛暟骞舵墽琛 if ((fun == "reset" || fun == "add") && (processNum >= 1 && processNum < 10) && - (frontType == "stat" || frontType == "recall" || frontType == "all")) { + (frontType == "cloudfront" || frontType == "all")) { - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { execute_bash(fun, processNum, frontType); - DIY_WARNLOG("process", "銆怶ARN銆戝墠缃殑%s%d鍙疯繘绋嬫墽琛屾寚浠:%s,reset琛ㄧず閲嶅惎鎵鏈夎繘绋,add琛ㄧず娣诲姞杩涚▼",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); + DIY_WARNLOG("process", "銆怶ARN銆戝墠缃殑%d鍙疯繘绋嬫墽琛屾寚浠:%s,reset琛ㄧず閲嶅惎鎵鏈夎繘绋,add琛ㄧず娣诲姞杩涚▼", g_front_seg_index, fun.c_str()); send_reply_to_queue(guid, "1", "鏀跺埌閲嶇疆杩涚▼鎸囦护,閲嶅惎鎵鏈夎繘绋!"); std::cout << "this msg should only execute once" << std::endl; @@ -577,7 +578,7 @@ bool parseJsonMessageSET(const std::string& json_str) { send_reply_to_queue(guid, "1", "鏀跺埌鍒犻櫎杩涚▼鎸囦护,杩欎釜杩涚▼灏嗕細閲嶅惎 "); - DIY_WARNLOG("process", "銆怶ARN銆戝墠缃殑%s%d鍙疯繘绋嬫墽琛屾寚浠:%s,鍗冲皢閲嶅惎",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); + DIY_WARNLOG("process", "銆怶ARN銆戝墠缃殑%d鍙疯繘绋嬫墽琛屾寚浠:%s,鍗冲皢閲嶅惎", g_front_seg_index, fun.c_str()); std::this_thread::sleep_for(std::chrono::seconds(10)); ::_exit(-1039); // 杩涚▼閫鍑 @@ -658,15 +659,15 @@ bool parseJsonMessageLOG(const std::string& json_str) { } // 鍒ゆ柇 frontType 鏄惁鍖归厤 - if (frontType != subdir) { + /*if (frontType != subdir) { std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl; return true; - } + }*/ - DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%s%d鍙疯繘绋嬪鐞嗘棩蹇椾笂閫佹秷鎭", get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%d鍙疯繘绋嬪鐞嗘棩蹇椾笂閫佹秷鎭", g_front_seg_index); std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl; - std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl; + /*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/ // 鍥炲娑堟伅 send_reply_to_queue(guid, "1", "鏀跺埌瀹炴椂鏃ュ織鎸囦护"); @@ -683,7 +684,7 @@ bool parseJsonMessageLOG(const std::string& json_str) { process_log_command(id, level, grade, logtype); } else { std::cout << "type doesn't match" << std::endl; - DIY_WARNLOG("process", "銆怶ARN銆戝墠缃殑%s%d鍙疯繘绋嬪鐞嗘棩蹇椾笂閫佹秷鎭,鏍煎紡涓嶆纭", get_front_msg_from_subdir(), g_front_seg_index); + DIY_WARNLOG("process", "銆怶ARN銆戝墠缃殑%d鍙疯繘绋嬪鐞嗘棩蹇椾笂閫佹秷鎭,鏍煎紡涓嶆纭", g_front_seg_index); } std::cout << "this msg should only execute once" << std::endl; @@ -737,8 +738,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl; - DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%s%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勫彴璐︽洿鏂版秷鎭", - get_front_msg_from_subdir(), g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); + DIY_INFOLOG("process", "銆怤ORMAL銆戝墠缃殑%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勫彴璐︽洿鏂版秷鎭", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); send_reply_to_queue(guid, "1", "鏀跺埌鍙拌处鏇存柊鎸囦护"); @@ -865,7 +866,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms } else{ std::cerr << "rtdata is NULL." << std::endl; - DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮ˉ鎷涜Е鍙戞秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); + DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮ˉ鎷涜Е鍙戞秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); } @@ -909,7 +910,7 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms // 璋冪敤涓氬姟閫昏緫澶勭悊鍑芥暟 std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate"; if (!parseJsonMessageUD(body, updatefilepath)) { - DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勫彴璐︽洿鏂版秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); + DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勫彴璐︽洿鏂版秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -939,7 +940,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) // 璋冪敤涓氬姟澶勭悊閫昏緫 if (!parseJsonMessageSET(body)) { - DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮繘绋嬫帶鍒舵秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮繘绋嬫帶鍒舵秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -969,7 +970,7 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg) // 鎵ц鏃ュ織涓婇佸鐞 if (!parseJsonMessageLOG(body)) { - DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勬棩蹇椾笂閫佹秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); + DIY_ERRORLOG("process", "銆怑RROR銆戝墠缃殑%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勬棩蹇椾笂閫佹秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -1012,7 +1013,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms } else { std::cerr << "recall data is NULL." << std::endl; - DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%s%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮ˉ鎷涜Е鍙戞秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + DIY_ERRORLOG("process","銆怑RROR銆戝墠缃殑%d鍙疯繘绋嬪鐞唗opic:%s_%s鐨勮ˉ鎷涜Е鍙戞秷鎭け璐,娑堟伅鐨刯son缁撴瀯涓嶆纭", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -1317,10 +1318,10 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime, data.strTopic = G_CONNECT_TOPIC; data.strText = jsonObject.dump(); // 杞崲涓哄瓧绗︿覆 - if (g_node_id == STAT_DATA_BASE_NODE_ID) { - std::lock_guard lock(queue_data_list_mutex); - queue_data_list.push_back(data); - } + //if (g_node_id == STAT_DATA_BASE_NODE_ID) { + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); + //} } catch (const std::exception& e) { std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl; @@ -1337,7 +1338,7 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const obj["step"] = step; obj["result"] = result; obj["processNo"] = g_front_seg_index; - obj["frontType"] = get_front_type_from_subdir(); + obj["frontType"] = "cloudfront"; obj["nodeId"] = FRONT_INST; // 鏋勯 queue 娑堟伅 @@ -1360,7 +1361,7 @@ void send_heartbeat_to_queue(const std::string& status) { try{ nlohmann::json obj; obj["nodeId"] = FRONT_INST; - obj["frontType"] = get_front_type_from_subdir(); + obj["frontType"] = "cloudfront"; obj["processNo"] = g_front_seg_index; obj["status"] = status; @@ -1432,8 +1433,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { if (type == 0) { std::cout << "use ledger send msg" << std::endl; - - for (size_t i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < terminal_devlist.size(); ++i) { + //鏍规嵁鍙拌处妯″紡涓嬫瘡涓繘绋嬮兘浼氬彂閫 + for (size_t i = 0; total_messages > 0 && i < terminal_devlist.size(); ++i) { const auto& dev = terminal_devlist[i]; if (shouldSkipTerminal(dev.terminal_id)) { @@ -1478,8 +1479,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { } } else { std::cout << "use monitor + number send msg" << std::endl; - - for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) { + //鏍规嵁铏氭瀯鐩戞祴鐐规ā寮忎笅鍙湁杩涚▼1鍙戦 + for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 ) && i < total_messages; ++i) { std::string monitor_id = "testmonitor" + std::to_string(i); data.mp_id = monitor_id; data.monitor_no = i; diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 3171a60..bf81b2f 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -9,6 +9,7 @@ #include "dealMsg.h" #include "cloudfront/code/interface.h" +#include using namespace std; #if 0 @@ -242,8 +243,8 @@ void restart_thread(int index) { } else if (index == 2) { // 接口,mq - char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; - ThreadArgs* args = new ThreadArgs{3, argv}; + char* argv[] = { (char*)new_index };//这里需要构造进程号参数传入 + ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart message processor thread %d\n", index); @@ -265,7 +266,12 @@ int is_thread_alive(pthread_t tid) { } /* 主函数 */ -int main() { +int main(int argc ,char** argv) {//多进程添加参数 + if(!parse_param(argc,argv)){ + std::cerr << "process param error,exit" << std::endl; + return 1; + } + srand(time(NULL)); // 初始化随机数种子 // 初始化线程数组 @@ -296,8 +302,8 @@ int main() { } else if (i == 2){ //接口和mq - char* argv[] = { (char*)index,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; - ThreadArgs* args = new ThreadArgs{3, argv}; + char* argv[] = { (char*)index };//这里需要构造进程号参数传入 + ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { printf("Failed to create message processor thread %d\n", i); delete args; // 如果线程没创建成功就手动释放