fix deadlock

This commit is contained in:
lnk
2025-09-23 21:00:24 +08:00
parent b976795573
commit e997c88d82
5 changed files with 125 additions and 142 deletions

View File

@@ -3036,6 +3036,13 @@ std::string extract_filename1(const std::string& path) {
return (pos != std::string::npos) ? path.substr(pos + 1) : path;
}
// ★新增dirname返回“目录/”(保留末尾斜杠;若没有目录则返回空串)
static inline std::string dirname_with_slash(const std::string& path) {
size_t pos = path.find_last_of("/\\");
if (pos == std::string::npos) return std::string{};
return path.substr(0, pos + 1);
}
//发送匹配的所有录波文件
bool SendAllQvvrFiles(qvvr_file& qfile, std::string& out_wavepath) {
std::vector<std::string> wavepaths;
@@ -3043,7 +3050,7 @@ bool SendAllQvvrFiles(qvvr_file& qfile, std::string& out_wavepath) {
bool send_success = true;
for (const auto& file_localpath : qfile.file_download) {
std::string file_cloudpath = "comtrade/" + file_localpath;
std::string file_cloudpath = "comtrade/" + dirname_with_slash(file_localpath);
std::string wavepath_result;
// 发送本地文件到远端,返回 wavepath
@@ -3100,7 +3107,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
<< " | terminal_id=" << terminal_id << std::endl;
//台账加锁
std::lock_guard<std::mutex> lock(ledgermtx);
std::unique_lock<std::mutex> lock(ledgermtx);
// 去除 mac 路径前缀,仅保留文件名
std::string filename = sanitize(extract_filename1(filename_with_mac));
@@ -3211,6 +3218,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
std::cout << std::endl;
std::cout << "[update_qvvr_file_download] Downloaded files (file_download): ";
for (const auto& fn : s_down) std::cout << fn << " ";
std::cout << std::endl;
// 检查 file_download 是否与 file_name 完全一致(集合相同)
if (s_name == s_down) {
@@ -3226,10 +3234,28 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
if (compare_qvvr_and_file(fpath, monitor.qvvrevent.qvvrdata,matched)) {
qfile.is_pair = true; // 文件与事件匹配成功
// ★新增:上传前拷贝“将要上传的文件列表”,避免锁外用容器引用
std::vector<std::string> files_to_send(qfile.file_download.begin(),
qfile.file_download.end());
// ★新增:构造一个临时 qvvr_file仅用于上传不改动原结构
qvvr_file tmp_send;
tmp_send.file_download.assign(files_to_send.begin(), files_to_send.end());
// 发送所有文件(已下载完成)
std::string wavepath;
// ★在解锁前,备份“签名”,用于回锁后定位同一个 qfile
std::set<std::string> sig_names(qfile.file_name.begin(), qfile.file_name.end());
std::set<std::string> sig_downs(qfile.file_download.begin(), qfile.file_download.end());
// ★修改:把上传与上送 JSON 放到“解锁区间”
lock.unlock(); // ★新增:提前解锁
if (SendAllQvvrFiles(qfile, wavepath)) {
//文件发送成功后更新事件
transfer_json_qvvr_data(terminal_id,
logical_seq,
matched.QVVR_Amg,
@@ -3238,7 +3264,10 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
matched.QVVR_type,
matched.phase,
wavepath);
// ★新增:上传成功后再加锁,准备修改台账
lock.lock();
// 删除上传成功的文件
for (const auto& uploaded_file : qfile.file_download) {
if (std::remove(uploaded_file.c_str()) != 0) {
@@ -3248,8 +3277,20 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
}
}
// 清除已发送的暂态文件
monitor.qvvrevent.qvvrfile.erase(monitor.qvvrevent.qvvrfile.begin() + i);
// ★替换原来的 i<size 判断为:按签名查找当前容器里的那一条
auto it_qf = std::find_if(monitor.qvvrevent.qvvrfile.begin(),
monitor.qvvrevent.qvvrfile.end(),
[&](const qvvr_file& x){
std::set<std::string> n(x.file_name.begin(), x.file_name.end());
std::set<std::string> d(x.file_download.begin(), x.file_download.end());
return n==sig_names && d==sig_downs;
});
if (it_qf != monitor.qvvrevent.qvvrfile.end()) {
monitor.qvvrevent.qvvrfile.erase(it_qf); // ✔ 删到同一条
} else {
std::cerr << "[Cleanup] qvvrfile changed; target group not found, skip erase\n";
}
//清除暂态事件
auto it = std::find_if(
@@ -3264,6 +3305,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
}
}
else {
lock.lock(); // ★新增:失败时补回锁
std::cerr << "[update_qvvr_file_download] Failed to send qvvr files for logical_seq=" << logical_seq << std::endl;
}
}
@@ -3277,7 +3319,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
else{
std::cout << "qvvr file still imcomplete!!!" << std::endl;
}
lock.unlock();
return true; // 当前文件处理成功
}
}
@@ -3286,6 +3328,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st
}
}
lock.unlock();
return false; // 未匹配到终端ID或逻辑序号对应的监测点
}
@@ -3300,43 +3343,28 @@ std::string normalize_mac(const std::string &mac) {
return res;
}
//找到dev的mac
std::string get_mac_by_devid(const std::string &devid) {
std::lock_guard<std::mutex> lock(ledgermtx);
for (auto &dev : terminal_devlist) {
if (dev.terminal_id == devid) {
return normalize_mac(dev.addr_str); // 规范化后返回
}
}
return {}; // 没找到返回空串
}
////////////////////////////////////////////////////////////////////////////////////////目录信息发送接口函数
bool send_file_list(const std::string &dev_id, const std::vector<tag_dir_info> &FileList) {
// 找到对应 terminal_dev
std::lock_guard<std::mutex> lock(ledgermtx);
bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info>& FileList) {
auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev &dev) { return dev.terminal_id == dev_id; });
if (it == terminal_devlist.end()) {
std::cerr << "[send_file_list] device not found: " << dev_id << std::endl;
if (!dev) {
std::cerr << "[send_file_list_locked] dev=nullptr\n";
return false;
}
terminal_dev &dev = *it;
// 判断 isbusy==1 且 busytype==READING_FILEMENU
if (dev.isbusy != 1 || dev.busytype != static_cast<int>(DeviceState::READING_FILEMENU)) {
if (dev->isbusy != 1 || dev->busytype != static_cast<int>(DeviceState::READING_FILEMENU)) {
std::cerr << "[send_file_list] device not in READING_FILEMENU state." << std::endl;
return false;
}
// 构造 JSON 报文
nlohmann::json j;
j["guid"] = dev.guid;
j["guid"] = dev->guid;
j["FrontIP"] = FRONT_IP; // 这里填你的前置机 IP
j["Node"] = g_front_seg_index; // 节点号
j["Dev_mac"] = normalize_mac(dev.addr_str); // addr_str 存的是 MAC
j["Dev_mac"] = normalize_mac(dev->addr_str); // addr_str 存的是 MAC
// 构造 DirInfo 数组
nlohmann::json dirArray = nlohmann::json::array();
@@ -3374,12 +3402,12 @@ bool send_file_list(const std::string &dev_id, const std::vector<tag_dir_info> &
std::cout << "[send_reply_to_cloud] queued: " << j.dump() << std::endl;
//发送后清除guid和标志
if (dev.isbusy > 0) {
dev.isbusy--;
if (dev->isbusy > 0) {
dev->isbusy--;
}
if(dev.isbusy == 0){
dev.guid.clear();
dev.busytype = 0;
if(dev->isbusy == 0){
dev->guid.clear();
dev->busytype = 0;
}
return true;
@@ -3432,7 +3460,7 @@ void check_device_busy_timeout()
<< dev.busytimecount << "s)" << std::endl;
//发送超时响应
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype));
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
// 超时清空状态
dev.guid.clear(); // 清空进行中的 guid
@@ -3450,7 +3478,7 @@ void check_device_busy_timeout()
<< " 超时(" << dev.busytimecount << "s)" << std::endl;
//发送超时响应
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype));
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
// 超时清空状态
dev.guid.clear();
@@ -3862,7 +3890,7 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
int nStep = 0; // [新增] 每个 NameFixValue 递增
int kSort = 1; // [新增] 排序号,从 1 开始
// 保护dz_internal_info_list 是引用成员,确保不会因并发被改动(当前已在 ledgermtx 下)
// 保护dz_internal_info_list 是引用成员,确保不会因并发被改动
//for (const auto& nf : dev.dz_internal_info_list) { // [新增]
for (size_t idxNF = 0; idxNF < n_use; ++idxNF) { // [修改] 使用 idxNF 控制索引
const auto& nf = dev.dz_internal_info_list[idxNF];
@@ -3982,24 +4010,6 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
return true;
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////清空运行状态
// 清理指定 terminal_id 的运行时状态
void clear_terminal_runtime_state(const std::string& id) {
std::lock_guard<std::mutex> lock(ledgermtx); // 加锁保证线程安全
for (auto& dev : terminal_devlist) {
if (dev.terminal_id == id) {
dev.guid.clear(); // 清空 guid
dev.busytype = 0; // 业务类型归零
dev.isbusy = 0; // 清空业务标志
dev.busytimecount = 0; // 计时归零
std::cout << "[clear_terminal_runtime_state] Cleared runtime state for terminal_id="
<< id << std::endl;
break; // 找到后跳出
}
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑
//发送补招响应给web
void send_reply_to_kafka_recall(const std::string& guid, const std::string& step,int code, const std::string& result,const std::string& terminalId,const std::string& lineIndex,const std::string& recallStartDate,const std::string& recallEndDate){
@@ -4734,11 +4744,11 @@ void on_device_response_minimal(int response_code,
if (filemenu_cache_take(id, names)) {
//发送目录
send_file_list(id,names);
send_file_list(dev,names);
} else {
// 失败响应web并复位为空闲
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, static_cast<int>(DeviceState::READING_FILEMENU));
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
std::cout << "[RESP][FILEMENU->FILEMENU][WARN] dev=" << id
<< " names missing in cache" << std::endl;
}
@@ -4751,7 +4761,7 @@ void on_device_response_minimal(int response_code,
std::cout << "[RESP][FILEMENU->FILEMENU][OK] dev=" << id << std::endl;
} else {
// 失败响应web并复位为空闲
send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEMENU));
send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
dev->guid.clear();
dev->isbusy = 0;
@@ -4842,7 +4852,7 @@ void on_device_response_minimal(int response_code,
// ====== 分支 A当前业务就是“读取文件数据” ======
if (ok) {
// 成功:复位
send_reply_to_cloud(static_cast<int>(ResponseCode::OK), id, static_cast<int>(DeviceState::READING_FILEDATA));
send_reply_to_cloud(static_cast<int>(ResponseCode::OK), id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
dev->guid.clear();
dev->isbusy = 0;
@@ -4851,7 +4861,7 @@ void on_device_response_minimal(int response_code,
std::cout << "[RESP][FILEDATA->FILEDATA][OK] dev=" << id << std::endl;
} else {
// 失败响应web并复位
send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEDATA));
send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
dev->guid.clear();
dev->isbusy = 0;
@@ -4926,15 +4936,26 @@ void on_device_response_minimal(int response_code,
// ================= 其它状态统一处理 =================
default: {
//直接根据输入响应mq
send_reply_to_cloud(response_code, id, device_state_int);
//其他的错误和成功都会结束业务
clear_terminal_runtime_state(id);
std::lock_guard<std::mutex> lk(ledgermtx);
terminal_dev* dev = nullptr;
for (auto& d : terminal_devlist) {
if (d.terminal_id == id) { dev = &d; break; }
}
if (dev) {
//直接根据输入响应mq
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
//其他的错误和成功都会结束业务
dev->guid.clear(); // 清空 guid
dev->busytype = 0; // 业务类型归零
dev->isbusy = 0; // 清空业务标志
dev->busytimecount = 0; // 计时归零
std::cout << "[clear_terminal_runtime_state] Cleared runtime state for terminal_id="
<< id << std::endl;
}
break;
}
}
} // end switch
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////记录暂态事件到本地