diff --git a/.vscode/settings.json b/.vscode/settings.json index bb879da..d74d88c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -55,5 +55,78 @@ "C_Cpp_Runner.useLeakSanitizer": false, "C_Cpp_Runner.showCompilationTime": false, "C_Cpp_Runner.useLinkTimeOptimization": false, - "C_Cpp_Runner.msvcSecureNoWarnings": false + "C_Cpp_Runner.msvcSecureNoWarnings": false, + "files.associations": { + "any": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "cctype": "cpp", + "charconv": "cpp", + "chrono": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "codecvt": "cpp", + "compare": "cpp", + "concepts": "cpp", + "condition_variable": "cpp", + "csignal": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "forward_list": "cpp", + "list": "cpp", + "map": "cpp", + "set": "cpp", + "string": "cpp", + "unordered_map": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "format": "cpp", + "fstream": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "numbers": "cpp", + "ostream": "cpp", + "ranges": "cpp", + "semaphore": "cpp", + "span": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "text_encoding": "cpp", + "thread": "cpp", + "cinttypes": "cpp", + "typeinfo": "cpp", + "valarray": "cpp", + "variant": "cpp" + } } \ No newline at end of file diff --git a/LFtid1056/client2.h b/LFtid1056/client2.h index 921dfb2..37cf800 100644 --- a/LFtid1056/client2.h +++ b/LFtid1056/client2.h @@ -1,3 +1,6 @@ +#ifndef CLIENT_H +#define CLIENT_H + #include #include #include @@ -219,4 +222,6 @@ void try_reconnect(uv_timer_t* timer); void on_connect(uv_connect_t* req, int status); void on_close(uv_handle_t* handle); void init_clients(uv_loop_t* loop, const std::vector& devices); -void stop_all_clients(); \ No newline at end of file +void stop_all_clients(); + +#endif \ No newline at end of file diff --git a/LFtid1056/cloudfront/boot/start_fe.sh b/LFtid1056/cloudfront/boot/start_fe.sh index bf19973..3d3f1f5 100644 --- a/LFtid1056/cloudfront/boot/start_fe.sh +++ b/LFtid1056/cloudfront/boot/start_fe.sh @@ -11,7 +11,7 @@ QTDIR=/qt-4.8.4 export QTDIR -FEP_ENV=/FeProject +FEP_ENV=/home/pq/zwproject/LFtid1056 export FEP_ENV PATH=$FEP_ENV/bin:$QTDIR/bin:$PATH diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 13af018..94475e6 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -31,6 +31,8 @@ #include "tinyxml2.h" #include "rocketmq.h" + + ///////////////////////////////////////////////////////////////////////////////////////////////// using namespace std; @@ -58,8 +60,6 @@ extern std::list queue_data_list; //queue发送数据链表 extern int three_secs_enabled; -extern std::vector terminal_devlist; - extern std::map xmlinfo_list;//保存所有型号对应的icd映射文件解析数据 extern XmlConfig xmlcfg;//星形接线xml节点解析的数据-默认映射文件解析数据 extern std::list topicList; //队列发送主题链表 @@ -527,7 +527,7 @@ void init_config() { } //测试进程端口 - if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集 + /*if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集 TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index; else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召 TEST_PORT = TEST_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index; @@ -537,8 +537,8 @@ void init_config() { } else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波 TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index; - } - + }*/ + TEST_PORT = TEST_PORT + g_front_seg_index; } ////////////////////////////////////////////////////////////////////////////////////////////获取当前时间 @@ -817,7 +817,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) { DIR* dir = opendir(cfg_dir.c_str()); if (!dir) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件,补招文件路径FRONT_PATH + /etc/recall/不存在", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件,补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index); return false; } @@ -829,7 +829,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) { std::string filepath = cfg_dir + "/" + filename; tinyxml2::XMLDocument doc; if (doc.LoadFile(filepath.c_str()) != tinyxml2::XML_SUCCESS) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件%s,补招内容无效", get_front_msg_from_subdir(), g_front_seg_index, filepath.c_str()); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件%s,补招内容无效", g_front_seg_index, filepath.c_str()); continue; } @@ -871,20 +871,18 @@ void process_recall_config(recall_xml_t* recall_xml) //根据监测点id来获取补招数据,补招时调用这个 void Check_Recall_Config(const std::string& id) { - if (g_node_id == HIS_DATA_BASE_NODE_ID || + /*if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || - g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) { + g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {*/ - recall_xml_t recall_xml; - std::memset(&recall_xml, 0, sizeof(recall_xml_t)); - - // 解析补招文件 - parse_recall_xml(&recall_xml, id); - - // 将补招数据赋值到全局变量 - process_recall_config(&recall_xml); - } + recall_xml_t recall_xml; + std::memset(&recall_xml, 0, sizeof(recall_xml_t)); + // 解析补招文件 + parse_recall_xml(&recall_xml, id); + // 将补招数据赋值到全局变量 + process_recall_config(&recall_xml); + //} } //补招成功后删除补招文件,补招后调用这个 @@ -925,7 +923,7 @@ void DeletcRecallXml() { DIR* dir = opendir(cfg_dir.c_str()); if (!dir) { std::cerr << "folder does not exist!" << std::endl; - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index); return; } @@ -944,7 +942,7 @@ void DeletcRecallXml() { if (stat(fullpath.c_str(), &file_stat) == 0) { if (file_stat.st_mtime < cutoff) { if (remove(fullpath.c_str()) == 0) { - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 删除超过两天的补招文件",get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 删除超过两天的补招文件", g_front_seg_index); } else { std::cerr << "Failed to remove file: " << fullpath << std::endl; } @@ -966,7 +964,7 @@ void CreateRecallXml() { g_StatisticLackList_list_mutex.lock(); if (!g_StatisticLackList.empty()) { - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 开始写入补招文件", get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 开始写入补招文件", g_front_seg_index); std::map> id_map; for (const auto& jr : g_StatisticLackList) { @@ -1006,7 +1004,7 @@ void CreateRecallXml() { tinyxml2::XMLError save_result = doc.SaveFile(path.str().c_str()); if (save_result != tinyxml2::XML_SUCCESS) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法将补招文件写入路径: %s",get_front_msg_from_subdir(), g_front_seg_index, path.str().c_str()); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法将补招文件写入路径: %s", g_front_seg_index, path.str().c_str()); continue; } } @@ -1019,10 +1017,10 @@ void CreateRecallXml() { //生成待补招xml文件 void create_recall_xml() { - if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) { - DeletcRecallXml(); - CreateRecallXml(); - } + //if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) { + DeletcRecallXml(); + CreateRecallXml(); + //} } // 工具函数:将时间字符串转为 time_t(秒级) @@ -1244,7 +1242,7 @@ void printTerminalDevMap(const std::map& terminal_dev std::cout << "Key: " << key << ", Terminal ID: " << dev.terminal_id - << ", Terminal Code: " << dev.terminal_code + << ", Terminal Code: " << dev.terminal_name << ", Organization Name: "<< dev.org_name << ", Maintenance Name: " << dev.maint_name << ", Station Name: " << dev.station_name @@ -1258,6 +1256,9 @@ void printTerminalDevMap(const std::map& terminal_dev << ", Address: " << dev.addr_str << ", Port: " << dev.port << ", Timestamp: " << dev.timestamp + + << ", mac: " << dev.mac + << std::endl; // 打印监测点信息 @@ -1265,13 +1266,19 @@ void printTerminalDevMap(const std::map& terminal_dev const auto& m = dev.line[i]; std::cout << " Monitor [" << i << "] " << "ID: " << m.monitor_id - << ", Code: " << m.terminal_code + << ", Code: " << m.terminal_id << ", Name: " << m.monitor_name << ", Seq: " << m.logical_device_seq << ", Voltage: "<< m.voltage_level << ", Connect: "<< m.terminal_connect << ", Timestamp:"<< m.timestamp << ", Status: " << m.status + + << ", CT1: " << m.CT1 + << ", CT2: " << m.CT2 + << ", PT1: " << m.PT1 + << ", PT2: " << m.PT2 + << std::endl; } } @@ -1357,7 +1364,7 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml, }; work_terminal.terminal_id = get_value("id"); - work_terminal.terminal_code = get_value("terminalCode"); + work_terminal.terminal_name = get_value("terminalCode"); work_terminal.org_name = get_value("orgName"); work_terminal.maint_name = get_value("maintName"); work_terminal.station_name = get_value("stationName"); @@ -1371,6 +1378,8 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml, work_terminal.port = get_value("port"); work_terminal.timestamp = get_value("updateTime"); + work_terminal.mac = get_value("mac"); + for (tinyxml2::XMLElement* monitor = root->FirstChildElement("monitorData"); monitor; monitor = monitor->NextSiblingElement("monitorData")) { @@ -1381,9 +1390,18 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml, mon.terminal_connect = monitor->FirstChildElement("ptType") ? monitor->FirstChildElement("ptType")->GetText() : "N/A"; mon.logical_device_seq = monitor->FirstChildElement("lineNo") ? monitor->FirstChildElement("lineNo")->GetText() : "N/A"; mon.timestamp = monitor->FirstChildElement("timestamp") ? monitor->FirstChildElement("timestamp")->GetText() : "N/A"; - mon.terminal_code = monitor->FirstChildElement("terminal_code") ? monitor->FirstChildElement("terminal_code")->GetText() : "N/A"; + mon.terminal_id = monitor->FirstChildElement("terminal_id") ? monitor->FirstChildElement("terminal_name")->GetText() : "N/A"; mon.status = monitor->FirstChildElement("status") ? monitor->FirstChildElement("status")->GetText() : "N/A"; + mon.CT1 = monitor->FirstChildElement("CT1") && monitor->FirstChildElement("CT1")->GetText() + ? atof(monitor->FirstChildElement("CT1")->GetText()) : 0.0; + mon.CT2 = monitor->FirstChildElement("CT2") && monitor->FirstChildElement("CT2")->GetText() + ? atof(monitor->FirstChildElement("CT2")->GetText()) : 0.0; + mon.PT1 = monitor->FirstChildElement("PT1") && monitor->FirstChildElement("PT1")->GetText() + ? atof(monitor->FirstChildElement("PT1")->GetText()) : 0.0; + mon.PT2 = monitor->FirstChildElement("PT2") && monitor->FirstChildElement("PT2")->GetText() + ? atof(monitor->FirstChildElement("PT2")->GetText()) : 0.0; + work_terminal.line.push_back(mon); } @@ -1536,9 +1554,9 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d target_dev.terminal_id = update.terminal_id; std::cout << "terminal_id: " << target_dev.terminal_id << std::endl; } - if (!update.terminal_code.empty()) { - target_dev.terminal_code = update.terminal_code; - std::cout << "terminal_code: " << target_dev.terminal_code << std::endl; + if (!update.terminal_name.empty()) { + target_dev.terminal_name = update.terminal_name; + std::cout << "terminal_name: " << target_dev.terminal_name << std::endl; } if (!update.tmnl_factory.empty()) { target_dev.tmnl_factory = update.tmnl_factory; @@ -1574,6 +1592,11 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d std::cout << "port: " << target_dev.port << std::endl; } + if (!update.mac.empty()) { + target_dev.mac = update.mac; + std::cout << "mac: " << target_dev.mac << std::endl; + } + if (!update.timestamp.empty()) { struct tm timeinfo = {}; if (sscanf(update.timestamp.c_str(), "%4d-%2d-%2d %2d:%2d:%2d", @@ -1609,9 +1632,14 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d m.voltage_level = mon.voltage_level; m.terminal_connect = mon.terminal_connect; m.status = mon.status; - m.terminal_code = mon.terminal_code; + m.terminal_id = mon.terminal_id; m.timestamp = mon.timestamp; + m.CT1 = mon.CT1; + m.CT2 = mon.CT2; + m.PT1 = mon.PT1; + m.PT2 = mon.PT2; + if (m.terminal_connect != "0") { isdelta_flag = 1; std::cout << "monitor_id " << m.monitor_id << " uses delta wiring." << std::endl; @@ -2104,13 +2132,18 @@ void print_monitor(const ledger_monitor& mon) { auto safe = [](const std::string& s) { return s.empty() ? "N/A" : s; }; std::cout << "Monitor ID: " << safe(mon.monitor_id) << "\n"; - std::cout << "Terminal Code: " << safe(mon.terminal_code) << "\n"; + std::cout << "Terminal ID: " << safe(mon.terminal_id) << "\n"; std::cout << "Monitor Name: " << safe(mon.monitor_name) << "\n"; std::cout << "Logical Device Sequence: " << safe(mon.logical_device_seq) << "\n"; std::cout << "Voltage Level: " << safe(mon.voltage_level) << "\n"; std::cout << "Terminal Connect: " << safe(mon.terminal_connect) << "\n"; std::cout << "Timestamp: " << safe(mon.timestamp) << "\n"; std::cout << "Status: " << safe(mon.status) << "\n"; + + std::cout << "CT1: " << mon.CT1 << "\n"; + std::cout << "CT2: " << mon.CT2 << "\n"; + std::cout << "PT1: " << mon.PT1 << "\n"; + std::cout << "PT2: " << mon.PT2 << "\n"; } void print_terminal(const terminal_dev& tmnl) { @@ -2118,7 +2151,7 @@ void print_terminal(const terminal_dev& tmnl) { std::cout << "GUID: " << safe(tmnl.guid) << "\n"; std::cout << "Terminal ID: " << safe(tmnl.terminal_id) << "\n"; - std::cout << "Terminal Code: " << safe(tmnl.terminal_code)<< "\n"; + std::cout << "Terminal Code: " << safe(tmnl.terminal_name)<< "\n"; std::cout << "Organization Name: "<< safe(tmnl.org_name) << "\n"; std::cout << "Maintenance Name: " << safe(tmnl.maint_name) << "\n"; std::cout << "Station Name: " << safe(tmnl.station_name) << "\n"; @@ -2131,6 +2164,8 @@ void print_terminal(const terminal_dev& tmnl) { std::cout << "Port: " << safe(tmnl.port) << "\n"; std::cout << "Timestamp: " << safe(tmnl.timestamp) << "\n"; + std::cout << "mac: " << safe(tmnl.mac) << "\n"; + for (size_t i = 0; i < 10 && !tmnl.line[i].monitor_id.empty(); ++i) { std::cout << " Monitor " << (i + 1) << ":\n"; print_monitor(tmnl.line[i]); @@ -2168,7 +2203,7 @@ void print_trigger_update_xml(const trigger_update_xml_t& trigger_update) { } } -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////解析映射文件 +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////解析模板文件 //解析映射文件 bool ParseXMLConfig2(int xml_flag, XmlConfig *cfg, std::list *ctopiclist, const std::string& path) @@ -2615,3 +2650,114 @@ void Set_xml_nodeinfo() } +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////数据转换函数 +// DataArrayItem to_json +void to_json(nlohmann::json& j, const DataArrayItem& d) { + j = nlohmann::json{ + {"DataAttr", d.DataAttr}, + {"DataTimeSec", d.DataTimeSec}, + {"DataTimeUSec", d.DataTimeUSec}, + {"DataTag", d.DataTag}, + {"Data", d.Data} + }; +} + +// MsgObj to_json +void to_json(nlohmann::json& j, const MsgObj& m) { + j = nlohmann::json{ + {"Cldid", m.Cldid}, + {"DataType", m.DataType}, + {"DataAttr", m.DataAttr}, + {"DsNameIdx", m.DsNameIdx}, + {"DataArray", m.DataArray} + }; +} + +// FullObj to_json +void to_json(nlohmann::json& j, const FullObj& f) { + j = nlohmann::json{ + {"Mid", f.Mid}, + {"Did", f.Did}, + {"Pri", f.Pri}, + {"Type", f.Type}, + {"Msg", f.Msg} + }; +} +std::string generate_json( + int Mid, //需应答的报文订阅者收到后需以此ID应答,无需应答填入“-1” + int Did, //设备唯一标识Ldid,填入0代表Ndid。 + int Pri, //报文处理的优先级 + int Type, //消息类型 + int Cldid, //逻辑子设备ID,0-逻辑设备本身,无填-1 + int DataType, //数据类型,0-表示以数据集方式上送 + int DataAttr, //数据属性:无“0”、实时“1”、统计“2”等。 + int DsNameIdx, //数据集序号(以数据集方式上送),无填-1 + const std::vector& dataArray //数据数组。 +) { + FullObj fobj; + fobj.Mid = Mid; + fobj.Did = Did; + fobj.Pri = Pri; + fobj.Type = Type; + fobj.Msg.Cldid = Cldid; + fobj.Msg.DataType = DataType; + fobj.Msg.DataAttr = DataAttr; + fobj.Msg.DsNameIdx = DsNameIdx; + fobj.Msg.DataArray = dataArray; + nlohmann::json j = fobj; + return j.dump(); // 输出标准 json 字符串 +} + +void upload_data_test(){ + std::vector arr; + arr.push_back({1, 1725477660, 0, 1, "xxxx"}); //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95” + //数据时标,相对1970年的秒,无效填入“-1” + //数据时标,微秒钟,无效填入“-1” + //数据标识,1-标识数据异常 + //数据序列(数据集上送时将二进制数据流转换成Base64字符串,其他数据为object) + arr.push_back({2, 1691741340, 0, 1, "yyyy"}); + + std::string js = generate_json( + -1, 2, 1, 4866, 1, 0, 2, 1, arr + ); + std::cout << js << std::endl; + + queue_data_t data; + data.monitor_no = 1; + data.strTopic = TOPIC_ALARM; + data.strText = js; + data.mp_id = "test"; + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////台账赋值给通信 + +std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist) { + std::vector devices; + + for (const auto& terminal : terminal_devlist) { + DeviceInfo device; + device.device_id = terminal.terminal_id; + device.name = terminal.terminal_name; + device.model = terminal.dev_series; + device.mac = terminal.mac; + + for (const auto& monitor : terminal.line) { + PointInfo point; + point.point_id = monitor.monitor_id; + point.name = monitor.monitor_name; + point.device_id = terminal.terminal_id; + point.PT1 = monitor.PT1; + point.PT2 = monitor.PT2; + point.CT1 = monitor.CT1; + point.CT2 = monitor.CT2; + + device.points.push_back(point); + } + + devices.push_back(device); + } + + return devices; +} \ No newline at end of file diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index d67b65d..edac700 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -133,7 +133,6 @@ void SendJsonAPI_web(const std::string& strUrl, //接口路径 } } - ////////////////////////////////////////////////////////////////////////////////////////////////////////上传文件接口 //处理文件上传响应 @@ -535,7 +534,7 @@ int terminal_ledger_web(std::map& terminal_dev_map, { if (inputstring.empty()) { std::cerr << "Error: inputstring is empty\n"; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程调用web台账接口的入参为空", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程调用web台账接口的入参为空", g_front_seg_index); return 1; } @@ -609,7 +608,7 @@ int terminal_ledger_web(std::map& terminal_dev_map, }; dev.terminal_id = safe_str(item, "id"); dev.addr_str = safe_str(item, "ip"); - dev.terminal_code = safe_str(item, "name"); + dev.terminal_name = safe_str(item, "name"); dev.org_name = safe_str(item, "org_name"); dev.maint_name = safe_str(item, "maint_name"); dev.station_name = safe_str(item, "stationName"); @@ -623,18 +622,26 @@ int terminal_ledger_web(std::map& terminal_dev_map, dev.processNo = safe_str(item, "processNo"); dev.maxProcessNum = safe_str(item, "maxProcessNum"); + dev.mac = safe_str(item, "mac");//添加mac + if (item.contains("monitorData") && item["monitorData"].is_array()) { for (auto& mon : item["monitorData"]) { if (dev.line.size() >= 10) break; ledger_monitor m; m.monitor_id = safe_str(mon, "id"); - m.terminal_code = safe_str(mon, "terminal_code"); + m.terminal_id = safe_str(mon, "terminal_id"); m.monitor_name = safe_str(mon, "name"); m.logical_device_seq = safe_str(mon, "lineNo"); m.voltage_level = safe_str(mon, "voltageLevel"); m.terminal_connect = safe_str(mon, "ptType"); m.timestamp = safe_str(mon, "updateTime"); m.status = safe_str(mon, "status"); + + m.CT1 = mon.value("CT1", 0.0); + m.CT2 = mon.value("CT2", 0.0); + m.PT1 = mon.value("PT1", 0.0); + m.PT2 = mon.value("PT2", 0.0); + dev.line.push_back(m); } } @@ -665,7 +672,8 @@ int terminal_ledger_web(std::map& terminal_dev_map, } // 5. 主进程保存台账 - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { save_ledger_json(responseStr); } @@ -684,7 +692,7 @@ int parse_device_cfg_web() input_jstr += "}"; std::cout << "input_jstr: " << input_jstr << std::endl; - DIY_DEBUGLOG("process","【DEBUG】前置的%s%d号进程调用web接口获取台账使用的请求输入为:%s",get_front_msg_from_subdir(), g_front_seg_index, input_jstr.c_str()); + DIY_DEBUGLOG("process","【DEBUG】前置的%d号进程调用web接口获取台账使用的请求输入为:%s", g_front_seg_index, input_jstr.c_str()); // 2. 调用接口 std::map terminal_dev_map; @@ -695,8 +703,9 @@ int parse_device_cfg_web() // 3. 调试打印 printTerminalDevMap(terminal_dev_map); - // 4. 看门狗配置校验(仅主进程稳态) - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + // 4. 看门狗配置校验(仅主进程) + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { int max_index = get_max_stat_data_index(FRONT_PATH + "/etc/runtime.cf"); std::cout << "max_index = " << max_index << std::endl; @@ -723,13 +732,13 @@ int parse_device_cfg_web() // 5. 台账数量与配置比对 int count_cfg = static_cast(terminal_dev_map.size()); std::cout << "terminal_ledger_num: " << count_cfg << std::endl; - DIY_DEBUGLOG("process", "【DEBUG】前置的%s%d号进程调用获取到的台账的数量为:%d",get_front_msg_from_subdir(), g_front_seg_index, count_cfg); + DIY_DEBUGLOG("process", "【DEBUG】前置的%d号进程调用获取到的台账的数量为:%d", g_front_seg_index, count_cfg); if (IED_COUNT < count_cfg) { std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl; - DIY_WARNLOG("process","【WARN】前置的%s%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); + DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT); } else { - DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT); + DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT); } ///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map,后续根据协议台账修改 @@ -876,7 +885,7 @@ int parse_model_cfg_web() // 3. 调用接口 std::map icd_model_map; if (parse_model_web(&icd_model_map, input_jstr)) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index); // 确保释放 map for (auto& kv : icd_model_map) delete kv.second; return 0; @@ -930,7 +939,7 @@ std::string parse_model_cfg_web_one(const std::string& terminal_type) // 2. 拉取并解析 if (parse_model_web(&icd_model_map, input_jstr) != 0) { std::cerr << "parse_model_web failed for type: " << terminal_type << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index); // 清理(即使 map 为空,也安全) for (auto& kv : icd_model_map) delete kv.second; return ""; diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index a48a632..9f74f13 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -6,6 +6,13 @@ #include #include #include +#include + +/////////////////////////////////////////////////////////////////////////////////////////// + +#include "nlohmann/json.hpp" + +#include "../../client2.h" /////////////////////////////////////////////////////////////////////////////////////////// @@ -13,13 +20,13 @@ class Front; /////////////////////////////////////////////////////////////////////////////////////////// -#define STAT_DATA_BASE_NODE_ID 100 +/*#define STAT_DATA_BASE_NODE_ID 100 #define THREE_SECS_DATA_BASE_NODE_ID 200 #define SOE_COMTRADE_BASE_NODE_ID 300 #define HIS_DATA_BASE_NODE_ID 400 #define NEW_HIS_DATA_BASE_NODE_ID 500 #define RECALL_HIS_DATA_BASE_NODE_ID 600 -#define RECALL_ALL_DATA_BASE_NODE_ID 700 +#define RECALL_ALL_DATA_BASE_NODE_ID 700*/ /////////////////////////////////////////////////////////////////////////////////////////// @@ -47,23 +54,28 @@ class ledger_monitor { public: std::string monitor_id; //监测点id - std::string terminal_code; //监测点 + std::string terminal_id; //监测点 std::string monitor_name; //监测点名 std::string logical_device_seq; //监测点序号 std::string voltage_level; //监测点电压等级 std::string terminal_connect; //监测点接线方式 std::string timestamp; //更新时间 std::string status; //监测点状态 + + double PT1; // 电压变比1 + double PT2; // 电压变比2 + double CT1; // 电流变比1 + double CT2; // 电流变比2 }; //终端台账 class terminal_dev { public: - std::string guid; + std::string guid; //台账更新回复用 std::string terminal_id; - std::string terminal_code; + std::string terminal_name; std::string org_name; std::string maint_name; std::string station_name; @@ -78,6 +90,8 @@ public: std::string processNo; std::string maxProcessNum; + std::string mac; // 装置MAC地址 + std::vector line; }; @@ -296,6 +310,9 @@ int parse_model_cfg_web(); void qvvr_test(); void Fileupload_test(); +extern std::vector terminal_devlist; +extern std::mutex ledgermtx; + //////////////////////////////////////////////////////////////////////////////////cfg_parse的函数声明 void init_config(); @@ -314,17 +331,23 @@ bool is_blank(const std::string& str); void print_terminal(const terminal_dev& tmnl); void printTerminalDevMap(const std::map& terminal_dev_map); +void upload_data_test(); + ////////////////////////////////////////////////////////////////////////////////mq - +extern std::mutex queue_data_list_mutex; +extern std::list queue_data_list; /////////////////////////////////////////////////////////////////////////////////主函数类声明 -std::string get_front_msg_from_subdir(); +//std::string get_front_msg_from_subdir(); extern std::string FRONT_PATH; +extern int g_front_seg_index; +extern int g_front_seg_num; void* cloudfrontthread(void* arg); +bool parse_param(int argc, char* argv[]); struct ThreadArgs { int argc; @@ -353,6 +376,45 @@ typedef struct { pthread_mutex_t lock; // 线程专用互斥锁 } thread_info_t; +///////////////////////////////////////////////////////////////////////////////////////上送数据的json格式 +// 单条 DataArray 数据 +struct DataArrayItem { + int DataAttr; + int DataTimeSec; + int DataTimeUSec; + int DataTag; + std::string Data; +}; + +// Msg 对象 +struct MsgObj { + int Cldid; + int DataType; + int DataAttr; + int DsNameIdx; + std::vector DataArray; +}; + +// 整体 +struct FullObj { + int Mid; + int Did; + int Pri; + int Type; + MsgObj Msg; +}; + +// nlohmann序列化接口 +void to_json(nlohmann::json& j, const DataArrayItem& d); +void to_json(nlohmann::json& j, const MsgObj& m); +void to_json(nlohmann::json& j, const FullObj& f); + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist); + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #endif diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index bffb006..0d4ed55 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -47,17 +47,11 @@ extern int g_front_seg_index; extern std::string FRONT_INST; extern std::string subdir; -//mq -extern std::mutex queue_data_list_mutex; //queue发送数据锁 -extern std::list queue_data_list; //queue发送数据链表 - //日志主题 extern std::string G_LOG_TOPIC; -extern std::vector terminal_devlist; - ////////////////////////////////////////////////////////辅助函数 -std::string get_front_type_from_subdir() { +/*std::string get_front_type_from_subdir() { if (subdir == "cfg_3s_data") return "realTime"; else if (subdir == "cfg_soe_comtrade") @@ -68,7 +62,7 @@ std::string get_front_type_from_subdir() { return "stat"; else return "unknown"; -} +}*/ // 递归创建目录 bool create_directory_recursive(const std::string& path) { @@ -159,7 +153,7 @@ protected: << "\",\"level\":\"" << level_str << "\",\"grade\":\"" << get_level_str(level) << "\",\"logtype\":\"" << (logtype == LOGTYPE_COM ? "com" : "data") - << "\",\"frontType\":\"" << get_front_type_from_subdir() + << "\",\"frontType\":\"" << "cloudfront" << "\",\"log\":\"" << escape_json(msg) << "\"}"; std::string jsonString = oss.str(); diff --git a/LFtid1056/cloudfront/code/log4.h b/LFtid1056/cloudfront/code/log4.h index 89b4d6a..ecab730 100644 --- a/LFtid1056/cloudfront/code/log4.h +++ b/LFtid1056/cloudfront/code/log4.h @@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch; extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); -std::string get_front_type_from_subdir(); +//std::string get_front_type_from_subdir(); // 不带 Appender 的版本 diff --git a/LFtid1056/cloudfront/code/log4cplus/log4.h b/LFtid1056/cloudfront/code/log4cplus/log4.h index 89b4d6a..ecab730 100644 --- a/LFtid1056/cloudfront/code/log4cplus/log4.h +++ b/LFtid1056/cloudfront/code/log4cplus/log4.h @@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch; extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result); -std::string get_front_type_from_subdir(); +//std::string get_front_type_from_subdir(); // 不带 Appender 的版本 diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index 0590696..b75a46d 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -52,7 +52,7 @@ std::string FRONT_PATH; int INITFLAG = 0; //前置标置 -std::string subdir = "cfg_stat_data"; //默认稳态 +std::string subdir = "cloudfrontproc"; //子目录 uint32_t g_node_id = 0; int g_front_seg_index = 0; //默认单进程 int g_front_seg_num = 0; //默认单进程 @@ -78,10 +78,6 @@ extern int TEST_PORT; //测试端口号 extern std::string FRONT_INST; -extern std::mutex queue_data_list_mutex; -extern std::list queue_data_list; - - /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数 template @@ -143,7 +139,7 @@ bool parse_param(int argc, char* argv[]) { } //获取前置类型 -void init_global_function_enable() { +/*void init_global_function_enable() { if (subdir == "cfg_stat_data") { // 历史稳态 g_node_id = STAT_DATA_BASE_NODE_ID; auto_register_report_enabled = 1; @@ -155,10 +151,10 @@ void init_global_function_enable() { } else if (subdir == "cfg_recallhis_data") { // 补招 g_node_id = RECALL_HIS_DATA_BASE_NODE_ID; } -} +}*/ //获取功能名称 -std::string get_front_msg_from_subdir() { +/*std::string get_front_msg_from_subdir() { if (subdir.find("cfg_3s_data") != std::string::npos) return "实时数据进程"; else if (subdir.find("cfg_soe_comtrade") != std::string::npos) @@ -169,7 +165,7 @@ std::string get_front_msg_from_subdir() { return "稳态统计进程"; else return "unknown"; -} +}*/ //获取前置路径 std::string get_parent_directory() { @@ -203,14 +199,14 @@ std::string get_parent_directory() { { //初始化g_node_id - init_global_function_enable(); + //init_global_function_enable(); //配置初始化 init_config(); //启动进程日志 init_logger_process(); - DIY_WARNLOG("process","【WARN】前置的%s%d号进程 进程级日志初始化完毕", get_front_msg_from_subdir(), g_front_seg_index); + DIY_WARNLOG("process","【WARN】前置的%d号进程 进程级日志初始化完毕", g_front_seg_index); //读取台账 parse_device_cfg_web(); @@ -218,11 +214,11 @@ std::string get_parent_directory() { //初始化日志 init_loggers(); - //读取模型,下载文件 + //读取模型,下载模板文件 parse_model_cfg_web(); - //解析文件 - Set_xml_nodeinfo(); + //解析模板文件 + //Set_xml_nodeinfo(); StartFrontThread(); //开启主线程 @@ -388,13 +384,13 @@ void Front::mqconsumerThread() std::string nameServer = G_MQCONSUMER_IPPORT; std::vector subscriptions; - if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { - subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata); - } + //if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { + subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata); + //} subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate); - if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { - subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall); - } + //if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) { + subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall); + //} subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset); subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog); diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 16da6b8..a23440b 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -58,10 +58,6 @@ static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者 /////////////////////////////////////////////////////////////////////////////////////////////////////////// -//台账 -extern std::mutex ledgermtx; -extern std::vector terminal_devlist; - //前置进程 extern unsigned int g_node_id; extern int g_front_seg_index; @@ -282,7 +278,7 @@ void rocketmq_producer_send(rocketmq::RocketMQProducer* producer, producer->sendMessage(body, topic, tags, keys); } catch (const std::exception& e) { std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl; - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 MQ发送失败", get_front_msg_from_subdir(), g_front_seg_index); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 MQ发送失败", g_front_seg_index); } } @@ -544,7 +540,7 @@ bool parseJsonMessageSET(const std::string& json_str) { std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl; - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); if (code_str == "set_process") { if (!messageBody.contains("processNum")) { @@ -563,13 +559,14 @@ bool parseJsonMessageSET(const std::string& json_str) { // 校验参数并执行 if ((fun == "reset" || fun == "add") && (processNum >= 1 && processNum < 10) && - (frontType == "stat" || frontType == "recall" || frontType == "all")) { + (frontType == "cloudfront" || frontType == "all")) { - if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + //if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { + if (g_front_seg_index == 1) { execute_bash(fun, processNum, frontType); - DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); + DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str()); send_reply_to_queue(guid, "1", "收到重置进程指令,重启所有进程!"); std::cout << "this msg should only execute once" << std::endl; @@ -581,7 +578,7 @@ bool parseJsonMessageSET(const std::string& json_str) { send_reply_to_queue(guid, "1", "收到删除进程指令,这个进程将会重启 "); - DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str()); + DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str()); std::this_thread::sleep_for(std::chrono::seconds(10)); ::_exit(-1039); // 进程退出 @@ -662,15 +659,15 @@ bool parseJsonMessageLOG(const std::string& json_str) { } // 判断 frontType 是否匹配 - if (frontType != subdir) { + /*if (frontType != subdir) { std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl; return true; - } + }*/ - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理日志上送消息", get_front_msg_from_subdir(), g_front_seg_index); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理日志上送消息", g_front_seg_index); std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl; - std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl; + /*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/ // 回复消息 send_reply_to_queue(guid, "1", "收到实时日志指令"); @@ -687,7 +684,7 @@ bool parseJsonMessageLOG(const std::string& json_str) { process_log_command(id, level, grade, logtype); } else { std::cout << "type doesn't match" << std::endl; - DIY_WARNLOG("process", "【WARN】前置的%s%d号进程处理日志上送消息,格式不正确", get_front_msg_from_subdir(), g_front_seg_index); + DIY_WARNLOG("process", "【WARN】前置的%d号进程处理日志上送消息,格式不正确", g_front_seg_index); } std::cout << "this msg should only execute once" << std::endl; @@ -741,8 +738,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl; - DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息", - get_front_msg_from_subdir(), g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); + DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); send_reply_to_queue(guid, "1", "收到台账更新指令"); @@ -754,7 +751,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d terminal_dev json_data; json_data.terminal_id = item.value("id", ""); - json_data.terminal_code = item.value("name", ""); + json_data.terminal_name = item.value("name", ""); json_data.org_name = item.value("org_name", ""); json_data.maint_name = item.value("maint_name", ""); json_data.station_name = item.value("stationName", ""); @@ -783,7 +780,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d m.logical_device_seq = monitor_item.value("lineNo", ""); m.terminal_connect = monitor_item.value("ptType", ""); m.timestamp = json_data.timestamp; - m.terminal_code = json_data.terminal_code; + m.terminal_id = json_data.terminal_id; } } @@ -869,7 +866,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms } else{ std::cerr << "rtdata is NULL." << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); } @@ -913,7 +910,7 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms // 调用业务逻辑处理函数 std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate"; if (!parseJsonMessageUD(body, updatefilepath)) { - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -943,7 +940,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) // 调用业务处理逻辑 if (!parseJsonMessageSET(body)) { - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -973,7 +970,7 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg) // 执行日志上送处理 if (!parseJsonMessageLOG(body)) { - DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); + DIY_ERRORLOG("process", "【ERROR】前置的%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -1016,7 +1013,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms } else { std::cerr << "recall data is NULL." << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); } return rocketmq::CONSUME_SUCCESS; @@ -1180,7 +1177,7 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json xmlStream << "" << json_data.station_name << "" << std::endl; add_indent(xmlStream, indentLevel); - xmlStream << "" << json_data.terminal_code << "" << std::endl; + xmlStream << "" << json_data.terminal_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.timestamp << "" << std::endl; // Assuming `timestamp` @@ -1201,6 +1198,9 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.dev_key << "" << std::endl; + add_indent(xmlStream, indentLevel); + xmlStream << "" << json_data.mac << "" << std::endl; + // monitorData 部分 for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) { const ledger_monitor& monitor = json_data.line[i]; @@ -1228,9 +1228,21 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json xmlStream << "" << monitor.timestamp << "" << std::endl; add_indent(xmlStream, indentLevel); - xmlStream << "" << monitor.terminal_code << "" << std::endl; + xmlStream << "" << monitor.terminal_id << "" << std::endl; add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.CT1 << "" << std::endl; + + add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.CT2 << "" << std::endl; + + add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.PT1 << "" << std::endl; + + add_indent(xmlStream, indentLevel); + xmlStream << "" << monitor.PT2 << "" << std::endl; + + add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.status << "" << std::endl; indentLevel--; @@ -1306,10 +1318,10 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime, data.strTopic = G_CONNECT_TOPIC; data.strText = jsonObject.dump(); // 转换为字符串 - if (g_node_id == STAT_DATA_BASE_NODE_ID) { - std::lock_guard lock(queue_data_list_mutex); - queue_data_list.push_back(data); - } + //if (g_node_id == STAT_DATA_BASE_NODE_ID) { + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); + //} } catch (const std::exception& e) { std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl; @@ -1326,7 +1338,7 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const obj["step"] = step; obj["result"] = result; obj["processNo"] = g_front_seg_index; - obj["frontType"] = get_front_type_from_subdir(); + obj["frontType"] = "cloudfront"; obj["nodeId"] = FRONT_INST; // 构造 queue 消息 @@ -1349,7 +1361,7 @@ void send_heartbeat_to_queue(const std::string& status) { try{ nlohmann::json obj; obj["nodeId"] = FRONT_INST; - obj["frontType"] = get_front_type_from_subdir(); + obj["frontType"] = "cloudfront"; obj["processNo"] = g_front_seg_index; obj["status"] = status; @@ -1376,9 +1388,8 @@ bool shouldSkipTerminal(const std::string& terminal_id) { return false; } -void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { - - if(!INITFLAG){ +void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { + if (!INITFLAG) { std::cout << "前置未初始化完成\n"; return; } @@ -1422,8 +1433,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { if (type == 0) { std::cout << "use ledger send msg" << std::endl; - - for (size_t i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < terminal_devlist.size(); ++i) { + //根据台账模式下每个进程都会发送 + for (size_t i = 0; total_messages > 0 && i < terminal_devlist.size(); ++i) { const auto& dev = terminal_devlist[i]; if (shouldSkipTerminal(dev.terminal_id)) { @@ -1436,85 +1447,71 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) { data.mp_id = dev.line[j].monitor_id; data.monitor_no = static_cast(i + j); - std::string modified_time = std::to_string(current_time_ms); + std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; - - // 替换 Monitor - size_t monitor_pos = modified_strText.find("\"Monitor\""); - if (monitor_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", monitor_pos); - size_t quote_pos = modified_strText.find("\"", colon_pos); - size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id); - } - } - - // 替换 TIME - size_t time_pos = modified_strText.find("\"TIME\""); - if (time_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", time_pos); - size_t quote_pos = colon_pos; - size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); + try { + auto j = nlohmann::json::parse(modified_strText); + j["Did"] = i; + if (j.contains("Msg") && j["Msg"].is_object()) { + j["Msg"]["Cldid"] = j; + if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { + for (auto& item : j["Msg"]["DataArray"]) { + if (item.is_object()) { + item["DataTimeSec"] = std::stoll(modified_time); + } + } + } } + modified_strText = j.dump(); + } catch (...) { + // 保持原始文本 } data.strText = modified_strText; - //my_rocketmq_send(data,front->m_producer); std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; - } } } else { std::cout << "use monitor + number send msg" << std::endl; - - for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) { + //根据虚构监测点模式下只有进程1发送 + for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 ) && i < total_messages; ++i) { std::string monitor_id = "testmonitor" + std::to_string(i); - data.mp_id = monitor_id; data.monitor_no = i; - std::string modified_time = std::to_string(current_time_ms); + std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; - // 替换 Monitor - size_t monitor_pos = modified_strText.find("\"Monitor\""); - if (monitor_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", monitor_pos); - size_t quote_pos = modified_strText.find("\"", colon_pos); - size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id); - } - } - - // 替换 TIME - size_t time_pos = modified_strText.find("\"TIME\""); - if (time_pos != std::string::npos) { - size_t colon_pos = modified_strText.find(":", time_pos); - size_t quote_pos = colon_pos; - size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); - if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { - modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); + try { + auto j = nlohmann::json::parse(modified_strText); + j["Did"] = 0; + if (j.contains("Msg") && j["Msg"].is_object()) { + j["Msg"]["Cldid"] = data.mp_id; + if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { + for (auto& item : j["Msg"]["DataArray"]) { + if (item.is_object()) { + item["DataTimeSec"] = std::stoll(modified_time); + } + } + } } + modified_strText = j.dump(); + } catch (...) { + // 保持原始文本 } data.strText = modified_strText; - //my_rocketmq_send(data,front->m_producer); std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; - } } diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 78f769d..709147b 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -48,11 +48,6 @@ bool showinshellflag =false; extern std::list errorList, warnList, normalList; extern std::mutex errorListMutex, warnListMutex, normalListMutex; -extern std::vector terminal_devlist; -extern std::mutex ledgermtx; - -extern std::list queue_data_list; - extern int IED_COUNT; extern int INITFLAG; extern int g_front_seg_index; @@ -281,6 +276,7 @@ extern bool normalOutputEnabled; if (G_TEST_NUM != 0) { std::cout << "[PeriodicTask] Executing rocketmq_test_300()\n"; rocketmq_test_300(G_TEST_NUM, g_front_seg_index, G_TEST_TYPE,m_front); + //upload_data_test(); } } @@ -412,7 +408,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { std::ostringstream os; os << "\r\x1B[K------------------------------------\n"; os << "\r\x1B[K|-- terminal_id: " << dev.terminal_id << "\n"; - os << "\r\x1B[K|-- terminal_code: " << dev.terminal_code << "\n"; + os << "\r\x1B[K|-- terminal_name: " << dev.terminal_name << "\n"; os << "\r\x1B[K|-- dev_ip: " << dev.addr_str << "\n"; os << "\r\x1B[K|-- dev_port: " << dev.port << "\n"; os << "\r\x1B[K|-- dev_type: " << dev.dev_type << "\n"; @@ -427,6 +423,8 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { os << "\r\x1B[K|-- tmnl_status: " << dev.tmnl_status << "\n"; os << "\r\x1B[K|-- timestamp: " << dev.timestamp << "\n"; + os << "\r\x1B[K|-- mac: " << dev.mac << "\n"; + for (size_t i = 0; i < dev.line.size(); ++i) { const auto& ld = dev.line[i]; if (ld.monitor_id.empty()) continue; @@ -434,11 +432,17 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { os << "\r\x1B[K |-- monitor_id: " << ld.monitor_id << "\n"; os << "\r\x1B[K |-- monitor_name: " << ld.monitor_name << "\n"; os << "\r\x1B[K |-- logical_device_seq: " << ld.logical_device_seq << "\n"; - os << "\r\x1B[K |-- terminal_code: " << ld.terminal_code << "\n"; + os << "\r\x1B[K |-- terminal_id: " << ld.terminal_id << "\n"; os << "\r\x1B[K |-- voltage_level: " << ld.voltage_level << "\n"; os << "\r\x1B[K |-- terminal_connect: " << ld.terminal_connect << "\n"; os << "\r\x1B[K |-- status: " << ld.status << "\n"; os << "\r\x1B[K |-- timestamp: " << ld.timestamp << "\n"; + + os << "\r\x1B[K |-- CT1: " << ld.CT1 << "\n"; + os << "\r\x1B[K |-- CT2: " << ld.CT2 << "\n"; + os << "\r\x1B[K |-- PT1: " << ld.PT1 << "\n"; + os << "\r\x1B[K |-- PT2: " << ld.PT2 << "\n"; + } os << "\r\x1B[K------------------------------------\n"; diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 8f44d38..58a2461 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -7,6 +7,7 @@ #include "client2.h" #include "cloudfront/code/interface.h" +#include using namespace std; #if 0 @@ -121,6 +122,8 @@ void* client_manager_thread(void* arg) { // 100װ std::vector test_devices = generate_test_devices(100); + //std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk + // ͻ start_client_connect(devices); @@ -209,8 +212,8 @@ void restart_thread(int index) { } else if (false) { // ӿڣmq - char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" }; - ThreadArgs* args = new ThreadArgs{3, argv}; + char* argv[] = { (char*)new_index };//Ҫ̺Ų + ThreadArgs* args = new ThreadArgs{1, argv}; if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) { pthread_mutex_lock(&global_lock); printf("Failed to restart message processor thread %d\n", index); @@ -232,7 +235,12 @@ int is_thread_alive(pthread_t tid) { } /* */ -int main() { +int main(int argc ,char** argv) {//Ӳ + if(!parse_param(argc,argv)){ + std::cerr << "process param error,exit" << std::endl; + return 1; + } + srand(time(NULL)); // ʼ // ʼ߳ @@ -261,6 +269,16 @@ int main() { free(index); } } + else if (i == 2){ + //ӿںmq + char* argv[] = { (char*)index };//Ҫ̺Ų + ThreadArgs* args = new ThreadArgs{1, argv}; + if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) { + printf("Failed to create message processor thread %d\n", i); + delete args; // ߳ûɹֶͷ + free(index); + } + } else { // ߳ // ΪգʵӦп߳