diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index b7520c7..7bd87cc 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -1156,7 +1156,7 @@ int recall_json_handle_from_mq(const std::string& body) return 10004; } std::string guid = messageBody["guid"].get(); - send_reply_to_queue(guid, static_cast(ResponseCode::OK), "收到补招指令"); + //send_reply_to_queue(guid, static_cast(ResponseCode::OK), "收到补招指令"); // 提取 data 数组 if (!messageBody.contains("data") || !messageBody["data"].is_array()) { @@ -2233,14 +2233,14 @@ void execute_bash(string fun,int process_num,string type) char p_num_str[20]; // 使用 sprintf 转换 std::sprintf(p_num_str, "%d", process_num); - const char* script = std::string(FRONT_PATH + "/bin/set_process.sh").c_str();//使用setsid防止端口占用 + std::string script = FRONT_PATH + "/bin/set_process.sh";//使用setsid防止端口占用 const char* param1 = fun.c_str(); const char* param2 = p_num_str; const char* param3 = type.c_str(); // 构造完整的命令 char command[256]; - snprintf(command, sizeof(command), "%s %s %s %s &", script, param1, param2, param3); + snprintf(command, sizeof(command), "%s %s %s %s &", script.c_str(), param1, param2, param3); std::cout << "command:" << command <& replies); //内部定值响应 bool send_internal_value_reply(const std::string &dev_id, const std::vector &control_words); diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index cefafe7..695036e 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -673,12 +673,14 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息", g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); - send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到台账更新指令"); + //send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到台账更新指令"); + std::vector reply_list; if (code_str == "add_terminal" || code_str == "ledger_modify") { std::cout << "add or modify ledger" << std::endl; if (messageBody.contains("data") && messageBody["data"].is_array()) { + for (const auto& item : messageBody["data"]) { terminal_dev json_data; @@ -733,7 +735,11 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_" + code_str + ".xml"; writeToFile(file_name, xmlContent); }*/ - if(code_str == "add_terminal"){ + if(code_str == "add_terminal"){ + + DeviceReply one; + one.deviceId = json_data.terminal_id; + std::lock_guard lock(ledgermtx); // ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中 const std::string& tid = json_data.terminal_id; @@ -749,12 +755,15 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), - "终端 id: " + json_data.terminal_id + " 台账添加成功"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), + "终端 id: " + json_data.terminal_id + " 台账添加成功");*/ + + one.code = static_cast(ResponseCode::OK); + one.result = "台账添加成功"; } else{ - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), - "终端 id: " + json_data.terminal_id + " 已存在该装置,修改这个装置的台账"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), + "终端 id: " + json_data.terminal_id + " 已存在该装置,修改这个装置的台账");*/ if(erase_one_terminals_by_id(json_data.terminal_id) == 1){ init_loggers_bydevid(json_data.terminal_id); @@ -764,16 +773,25 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), - "终端 id: " + json_data.terminal_id + " 台账修改成功"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), + "终端 id: " + json_data.terminal_id + " 台账修改成功");*/ + one.code = static_cast(ResponseCode::OK); + one.result = "台账修改成功"; } else{ - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), - "终端 id: " + json_data.terminal_id + " 台账修改失败"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), + "终端 id: " + json_data.terminal_id + " 台账修改失败");*/ + one.code = static_cast(ResponseCode::BAD_REQUEST); + one.result = "台账修改失败"; } } + reply_list.push_back(std::move(one)); } else if(code_str == "ledger_modify"){ + + DeviceReply one; + one.deviceId = json_data.terminal_id; + std::lock_guard lock(ledgermtx); // ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中 @@ -782,8 +800,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d [&](const terminal_dev& d){ return d.terminal_id == tid; }); if (it == terminal_devlist.end()) { - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::NOT_FOUND), - "终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::NOT_FOUND), + "终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置");*/ DIY_WARNLOG("process", "【WARN】无法修改台账,未找到指定装置: %s ,改为添加这个装置", tid.c_str()); @@ -794,8 +812,12 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), - "终端 id: " + json_data.terminal_id + " 台账添加成功"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), + "终端 id: " + json_data.terminal_id + " 台账添加成功");*/ + one.code = static_cast(ResponseCode::OK); + one.result = "台账添加成功"; + reply_list.push_back(std::move(one)); + continue;//修改下一个 } if(erase_one_terminals_by_id(json_data.terminal_id) == 1){ @@ -806,13 +828,18 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), - "终端 id: " + json_data.terminal_id + " 台账修改成功"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), + "终端 id: " + json_data.terminal_id + " 台账修改成功");*/ + one.code = static_cast(ResponseCode::OK); + one.result = "台账修改成功"; } else{ - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), - "终端 id: " + json_data.terminal_id + " 台账修改失败"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), + "终端 id: " + json_data.terminal_id + " 台账修改失败");*/ + one.code = static_cast(ResponseCode::BAD_REQUEST); + one.result = "台账修改失败"; } + reply_list.push_back(std::move(one)); } } @@ -820,6 +847,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d } else if (code_str == "delete_terminal") { std::cout << "delete ledger" << std::endl; if (messageBody.contains("data") && messageBody["data"].is_array()) { + for (const auto& item : messageBody["data"]) { terminal_dev json_data{}; auto id = item.value("id", ""); @@ -833,23 +861,34 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d writeToFile(file_name, xmlContent); }*/ + DeviceReply one; + one.deviceId = json_data.terminal_id; + //直接加锁删除 std::lock_guard lock(ledgermtx); if(erase_one_terminals_by_id(json_data.terminal_id) == 1){ ClientManager::instance().remove_device(json_data.terminal_id); - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), - "终端 id: " + json_data.terminal_id + " 台账删除成功"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), + "终端 id: " + json_data.terminal_id + " 台账删除成功");*/ + one.code = static_cast(ResponseCode::OK); + one.result = "台账删除成功"; } else{ - send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), - "终端 id: " + json_data.terminal_id + " 台账删除失败"); + /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), + "终端 id: " + json_data.terminal_id + " 台账删除失败");*/ + one.code = static_cast(ResponseCode::BAD_REQUEST); + one.result = "台账删除失败"; } + + reply_list.push_back(std::move(one)); } } } else { std::cout << "code_str error" << std::endl; } + send_batch_reply_to_queue(guid, reply_list); + return true; } @@ -1385,6 +1424,37 @@ void send_reply_to_queue(const std::string& guid, const int code, const std::str } } +//台账更新批量回复 +void send_batch_reply_to_queue(const std::string& guid, + const std::vector& replies) { + try { + nlohmann::json root; + root["guid"] = guid; + + nlohmann::json arr = nlohmann::json::array(); + + for (const auto& r : replies) { + nlohmann::json item; + item["deviceId"] = r.deviceId; + item["code"] = r.code; + item["result"] = r.result; + arr.push_back(std::move(item)); + } + root["data"] = std::move(arr); + + queue_data_t connect_info; + connect_info.strTopic = Topic_Reply_Topic; + connect_info.strText = root.dump(); + connect_info.tag = Topic_Reply_Tag; + connect_info.key = Topic_Reply_Key; + + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(std::move(connect_info)); + } catch (const std::exception& e) { + std::cerr << "send_batch_reply_to_queue exception: " << e.what() << std::endl; + } +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////心跳消息 void send_heartbeat_to_queue(const std::string& status) { diff --git a/LFtid1056_bak.rar b/LFtid1056_bak.rar new file mode 100644 index 0000000..ef743e7 Binary files /dev/null and b/LFtid1056_bak.rar differ