fix some error and add some fun

This commit is contained in:
lnk
2025-10-11 09:08:43 +08:00
parent 35231baae7
commit c494225b38
5 changed files with 106 additions and 27 deletions

View File

@@ -673,12 +673,14 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到台账更新指令");
//send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到台账更新指令");
std::vector<DeviceReply> reply_list;
if (code_str == "add_terminal" || code_str == "ledger_modify") {
std::cout << "add or modify ledger" << std::endl;
if (messageBody.contains("data") && messageBody["data"].is_array()) {
for (const auto& item : messageBody["data"]) {
terminal_dev json_data;
@@ -733,7 +735,11 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_" + code_str + ".xml";
writeToFile(file_name, xmlContent);
}*/
if(code_str == "add_terminal"){
if(code_str == "add_terminal"){
DeviceReply one;
one.deviceId = json_data.terminal_id;
std::lock_guard<std::mutex> lock(ledgermtx);
// ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中
const std::string& tid = json_data.terminal_id;
@@ -749,12 +755,15 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账添加成功";
}
else{
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 已存在该装置,修改这个装置的台账");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 已存在该装置,修改这个装置的台账");*/
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
init_loggers_bydevid(json_data.terminal_id);
@@ -764,16 +773,25 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账修改成功";
}
else{
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "台账修改失败";
}
}
reply_list.push_back(std::move(one));
}
else if(code_str == "ledger_modify"){
DeviceReply one;
one.deviceId = json_data.terminal_id;
std::lock_guard<std::mutex> lock(ledgermtx);
// ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中
@@ -782,8 +800,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
[&](const terminal_dev& d){ return d.terminal_id == tid; });
if (it == terminal_devlist.end()) {
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::NOT_FOUND),
"终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::NOT_FOUND),
"终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置");*/
DIY_WARNLOG("process", "【WARN】无法修改台账未找到指定装置: %s ,改为添加这个装置", tid.c_str());
@@ -794,8 +812,12 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账添加成功";
reply_list.push_back(std::move(one));
continue;//修改下一个
}
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
@@ -806,13 +828,18 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账修改成功";
}
else{
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "台账修改失败";
}
reply_list.push_back(std::move(one));
}
}
@@ -820,6 +847,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
} else if (code_str == "delete_terminal") {
std::cout << "delete ledger" << std::endl;
if (messageBody.contains("data") && messageBody["data"].is_array()) {
for (const auto& item : messageBody["data"]) {
terminal_dev json_data{};
auto id = item.value("id", "");
@@ -833,23 +861,34 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
writeToFile(file_name, xmlContent);
}*/
DeviceReply one;
one.deviceId = json_data.terminal_id;
//直接加锁删除
std::lock_guard<std::mutex> lock(ledgermtx);
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
ClientManager::instance().remove_device(json_data.terminal_id);
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账删除成功");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账删除成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账删除成功";
}
else{
send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账删除失败");
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账删除失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "台账删除失败";
}
reply_list.push_back(std::move(one));
}
}
} else {
std::cout << "code_str error" << std::endl;
}
send_batch_reply_to_queue(guid, reply_list);
return true;
}
@@ -1385,6 +1424,37 @@ void send_reply_to_queue(const std::string& guid, const int code, const std::str
}
}
//台账更新批量回复
void send_batch_reply_to_queue(const std::string& guid,
const std::vector<DeviceReply>& replies) {
try {
nlohmann::json root;
root["guid"] = guid;
nlohmann::json arr = nlohmann::json::array();
for (const auto& r : replies) {
nlohmann::json item;
item["deviceId"] = r.deviceId;
item["code"] = r.code;
item["result"] = r.result;
arr.push_back(std::move(item));
}
root["data"] = std::move(arr);
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = root.dump();
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
} catch (const std::exception& e) {
std::cerr << "send_batch_reply_to_queue exception: " << e.what() << std::endl;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////心跳消息
void send_heartbeat_to_queue(const std::string& status) {