From e997c88d82af142aed897bdd238f82742cc6a6c1 Mon Sep 17 00:00:00 2001 From: lnk Date: Tue, 23 Sep 2025 21:00:24 +0800 Subject: [PATCH] fix deadlock --- LFtid1056/cloudfront/code/cfg_parser.cpp | 153 +++++++++++++---------- LFtid1056/cloudfront/code/interface.cpp | 8 +- LFtid1056/cloudfront/code/interface.h | 10 +- LFtid1056/cloudfront/code/rocketmq.cpp | 50 ++------ LFtid1056/cloudfront/config/front.cfg | 46 +++---- 5 files changed, 125 insertions(+), 142 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 6fdb8a0..5cdd353 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -3036,6 +3036,13 @@ std::string extract_filename1(const std::string& path) { return (pos != std::string::npos) ? path.substr(pos + 1) : path; } +// ★新增:dirname,返回“目录/”(保留末尾斜杠;若没有目录则返回空串) +static inline std::string dirname_with_slash(const std::string& path) { + size_t pos = path.find_last_of("/\\"); + if (pos == std::string::npos) return std::string{}; + return path.substr(0, pos + 1); +} + //发送匹配的所有录波文件 bool SendAllQvvrFiles(qvvr_file& qfile, std::string& out_wavepath) { std::vector wavepaths; @@ -3043,7 +3050,7 @@ bool SendAllQvvrFiles(qvvr_file& qfile, std::string& out_wavepath) { bool send_success = true; for (const auto& file_localpath : qfile.file_download) { - std::string file_cloudpath = "comtrade/" + file_localpath; + std::string file_cloudpath = "comtrade/" + dirname_with_slash(file_localpath); std::string wavepath_result; // 发送本地文件到远端,返回 wavepath @@ -3100,7 +3107,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st << " | terminal_id=" << terminal_id << std::endl; //台账加锁 - std::lock_guard lock(ledgermtx); + std::unique_lock lock(ledgermtx); // 去除 mac 路径前缀,仅保留文件名 std::string filename = sanitize(extract_filename1(filename_with_mac)); @@ -3211,6 +3218,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st std::cout << std::endl; std::cout << "[update_qvvr_file_download] Downloaded files (file_download): "; for (const auto& fn : s_down) std::cout << fn << " "; + std::cout << std::endl; // 检查 file_download 是否与 file_name 完全一致(集合相同) if (s_name == s_down) { @@ -3226,10 +3234,28 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st if (compare_qvvr_and_file(fpath, monitor.qvvrevent.qvvrdata,matched)) { qfile.is_pair = true; // 文件与事件匹配成功 + // ★新增:上传前拷贝“将要上传的文件列表”,避免锁外用容器引用 + std::vector files_to_send(qfile.file_download.begin(), + qfile.file_download.end()); + + // ★新增:构造一个临时 qvvr_file,仅用于上传(不改动原结构) + qvvr_file tmp_send; + tmp_send.file_download.assign(files_to_send.begin(), files_to_send.end()); + + // 发送所有文件(已下载完成) std::string wavepath; + + // ★在解锁前,备份“签名”,用于回锁后定位同一个 qfile + std::set sig_names(qfile.file_name.begin(), qfile.file_name.end()); + std::set sig_downs(qfile.file_download.begin(), qfile.file_download.end()); + + // ★修改:把上传与上送 JSON 放到“解锁区间” + lock.unlock(); // ★新增:提前解锁 + if (SendAllQvvrFiles(qfile, wavepath)) { //文件发送成功后更新事件 + transfer_json_qvvr_data(terminal_id, logical_seq, matched.QVVR_Amg, @@ -3238,7 +3264,10 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st matched.QVVR_type, matched.phase, wavepath); - + + // ★新增:上传成功后再加锁,准备修改台账 + lock.lock(); + // 删除上传成功的文件 for (const auto& uploaded_file : qfile.file_download) { if (std::remove(uploaded_file.c_str()) != 0) { @@ -3248,8 +3277,20 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st } } - // 清除已发送的暂态文件 - monitor.qvvrevent.qvvrfile.erase(monitor.qvvrevent.qvvrfile.begin() + i); + // ★替换原来的 i n(x.file_name.begin(), x.file_name.end()); + std::set d(x.file_download.begin(), x.file_download.end()); + return n==sig_names && d==sig_downs; + }); + + if (it_qf != monitor.qvvrevent.qvvrfile.end()) { + monitor.qvvrevent.qvvrfile.erase(it_qf); // ✔ 删到同一条 + } else { + std::cerr << "[Cleanup] qvvrfile changed; target group not found, skip erase\n"; + } //清除暂态事件 auto it = std::find_if( @@ -3264,6 +3305,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st } } else { + lock.lock(); // ★新增:失败时补回锁 std::cerr << "[update_qvvr_file_download] Failed to send qvvr files for logical_seq=" << logical_seq << std::endl; } } @@ -3277,7 +3319,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st else{ std::cout << "qvvr file still imcomplete!!!" << std::endl; } - + lock.unlock(); return true; // 当前文件处理成功 } } @@ -3286,6 +3328,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac_in, const st } } + lock.unlock(); return false; // 未匹配到终端ID或逻辑序号对应的监测点 } @@ -3300,43 +3343,28 @@ std::string normalize_mac(const std::string &mac) { return res; } -//找到dev的mac -std::string get_mac_by_devid(const std::string &devid) { - std::lock_guard lock(ledgermtx); - for (auto &dev : terminal_devlist) { - if (dev.terminal_id == devid) { - return normalize_mac(dev.addr_str); // 规范化后返回 - } - } - return {}; // 没找到返回空串 -} + ////////////////////////////////////////////////////////////////////////////////////////目录信息发送接口函数 -bool send_file_list(const std::string &dev_id, const std::vector &FileList) { - // 找到对应 terminal_dev - std::lock_guard lock(ledgermtx); +bool send_file_list(terminal_dev* dev, const std::vector& FileList) { - auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), - [&](const terminal_dev &dev) { return dev.terminal_id == dev_id; }); - if (it == terminal_devlist.end()) { - std::cerr << "[send_file_list] device not found: " << dev_id << std::endl; + if (!dev) { + std::cerr << "[send_file_list_locked] dev=nullptr\n"; return false; } - terminal_dev &dev = *it; - // 判断 isbusy==1 且 busytype==READING_FILEMENU - if (dev.isbusy != 1 || dev.busytype != static_cast(DeviceState::READING_FILEMENU)) { + if (dev->isbusy != 1 || dev->busytype != static_cast(DeviceState::READING_FILEMENU)) { std::cerr << "[send_file_list] device not in READING_FILEMENU state." << std::endl; return false; } // 构造 JSON 报文 nlohmann::json j; - j["guid"] = dev.guid; + j["guid"] = dev->guid; j["FrontIP"] = FRONT_IP; // 这里填你的前置机 IP j["Node"] = g_front_seg_index; // 节点号 - j["Dev_mac"] = normalize_mac(dev.addr_str); // addr_str 存的是 MAC + j["Dev_mac"] = normalize_mac(dev->addr_str); // addr_str 存的是 MAC // 构造 DirInfo 数组 nlohmann::json dirArray = nlohmann::json::array(); @@ -3374,12 +3402,12 @@ bool send_file_list(const std::string &dev_id, const std::vector & std::cout << "[send_reply_to_cloud] queued: " << j.dump() << std::endl; //发送后清除guid和标志 - if (dev.isbusy > 0) { - dev.isbusy--; + if (dev->isbusy > 0) { + dev->isbusy--; } - if(dev.isbusy == 0){ - dev.guid.clear(); - dev.busytype = 0; + if(dev->isbusy == 0){ + dev->guid.clear(); + dev->busytype = 0; } return true; @@ -3432,7 +3460,7 @@ void check_device_busy_timeout() << dev.busytimecount << "s)" << std::endl; //发送超时响应 - send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype)); + send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac); // 超时清空状态 dev.guid.clear(); // 清空进行中的 guid @@ -3450,7 +3478,7 @@ void check_device_busy_timeout() << " 超时(" << dev.busytimecount << "s)" << std::endl; //发送超时响应 - send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype)); + send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac); // 超时清空状态 dev.guid.clear(); @@ -3862,7 +3890,7 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector lock(ledgermtx); // 加锁保证线程安全 - - for (auto& dev : terminal_devlist) { - if (dev.terminal_id == id) { - dev.guid.clear(); // 清空 guid - dev.busytype = 0; // 业务类型归零 - dev.isbusy = 0; // 清空业务标志 - dev.busytimecount = 0; // 计时归零 - std::cout << "[clear_terminal_runtime_state] Cleared runtime state for terminal_id=" - << id << std::endl; - break; // 找到后跳出 - } - } -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑 //发送补招响应给web void send_reply_to_kafka_recall(const std::string& guid, const std::string& step,int code, const std::string& result,const std::string& terminalId,const std::string& lineIndex,const std::string& recallStartDate,const std::string& recallEndDate){ @@ -4734,11 +4744,11 @@ void on_device_response_minimal(int response_code, if (filemenu_cache_take(id, names)) { //发送目录 - send_file_list(id,names); + send_file_list(dev,names); } else { // 失败:响应web并复位为空闲 - send_reply_to_cloud(static_cast(ResponseCode::BAD_REQUEST), id, static_cast(DeviceState::READING_FILEMENU)); + send_reply_to_cloud(static_cast(ResponseCode::BAD_REQUEST), id, static_cast(DeviceState::READING_FILEMENU),dev->guid,dev->mac); std::cout << "[RESP][FILEMENU->FILEMENU][WARN] dev=" << id << " names missing in cache" << std::endl; } @@ -4751,7 +4761,7 @@ void on_device_response_minimal(int response_code, std::cout << "[RESP][FILEMENU->FILEMENU][OK] dev=" << id << std::endl; } else { // 失败:响应web并复位为空闲 - send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEMENU)); + send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEMENU),dev->guid,dev->mac); dev->guid.clear(); dev->isbusy = 0; @@ -4842,7 +4852,7 @@ void on_device_response_minimal(int response_code, // ====== 分支 A:当前业务就是“读取文件数据” ====== if (ok) { // 成功:复位 - send_reply_to_cloud(static_cast(ResponseCode::OK), id, static_cast(DeviceState::READING_FILEDATA)); + send_reply_to_cloud(static_cast(ResponseCode::OK), id, static_cast(DeviceState::READING_FILEDATA),dev->guid,dev->mac); dev->guid.clear(); dev->isbusy = 0; @@ -4851,7 +4861,7 @@ void on_device_response_minimal(int response_code, std::cout << "[RESP][FILEDATA->FILEDATA][OK] dev=" << id << std::endl; } else { // 失败:响应web并复位 - send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEDATA)); + send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEDATA),dev->guid,dev->mac); dev->guid.clear(); dev->isbusy = 0; @@ -4926,15 +4936,26 @@ void on_device_response_minimal(int response_code, // ================= 其它状态统一处理 ================= default: { - //直接根据输入响应mq - send_reply_to_cloud(response_code, id, device_state_int); - - //其他的错误和成功都会结束业务 - clear_terminal_runtime_state(id); - + std::lock_guard lk(ledgermtx); + terminal_dev* dev = nullptr; + for (auto& d : terminal_devlist) { + if (d.terminal_id == id) { dev = &d; break; } + } + if (dev) { + //直接根据输入响应mq + send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); + //其他的错误和成功都会结束业务 + dev->guid.clear(); // 清空 guid + dev->busytype = 0; // 业务类型归零 + dev->isbusy = 0; // 清空业务标志 + dev->busytimecount = 0; // 计时归零 + std::cout << "[clear_terminal_runtime_state] Cleared runtime state for terminal_id=" + << id << std::endl; + } + break; } - } + } // end switch } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////记录暂态事件到本地 diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index 3cda132..288339e 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -181,14 +181,14 @@ void handleUploadResponse(const std::string& response, std::string& wavepath) { std::cout << "File URL: " << url << std::endl; // 找到最后一个 '.' - size_t pos = fileName.find_last_of('.'); + size_t pos = name.find_last_of('.'); std::string nameWithoutExt; if (pos != std::string::npos) { // 截取去掉后缀的部分 - nameWithoutExt = fileName.substr(0, pos); + nameWithoutExt = name.substr(0, pos); } else { // 如果没有后缀,直接使用原文件名 - nameWithoutExt = fileName; + nameWithoutExt = name; } // 拷贝到 wavepath @@ -1341,7 +1341,7 @@ bool get_monitor_id_by_dev_and_seq(const std::string& terminal_id, unsigned short logical_seq, std::string& out_monitor_id) { - std::lock_guard lk(ledgermtx); // 若你的工程里有全局 ledgermtx/terminal_devlist + std::lock_guard lk(ledgermtx); for (const auto& dev : terminal_devlist) { if (dev.terminal_id != terminal_id) continue; diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 357e40f..515df60 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -611,11 +611,10 @@ bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vect bool update_qvvr_file_download(const std::string& filename_with_mac, const std::string& terminal_id); //上送文件列表接口 -bool send_file_list(const std::string &dev_id, const std::vector &FileList); +bool send_file_list(terminal_dev* dev, const std::vector &FileList); //提取mac std::string normalize_mac(const std::string& mac); -std::string get_mac_by_devid(const std::string &devid); //暂态文件超时检测 void check_and_backup_qvvr_files(); @@ -624,10 +623,7 @@ void check_and_backup_qvvr_files(); void check_device_busy_timeout(); //业务上报 -void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type); - -//查guid -std::string find_guid_index_from_dev_id(const std::string& dev_id); +void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type, const std::string& guid, const std::string& mac); //内部定值响应 bool send_internal_value_reply(const std::string &dev_id, const std::vector &control_words); @@ -650,8 +646,6 @@ void SendFileWeb(const std::string& strUrl, const std::string& localpath, const //状态翻转 void connect_status_update(const std::string& id, int status); -//业务停止 -void clear_terminal_runtime_state(const std::string& id); //业务响应 void on_device_response_minimal(int response_code, diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index e00f370..4c2d16d 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -341,40 +341,6 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer) rocketmq_producer_send(producer,senddata,topic,tag,key); } -/////////////////////////////////////////////////////////////////////////////////////////////////查找台账下标 -// 根据终端 ID 查找 terminal_devlist 中的索引,找不到返回 -1 -int find_dev_index_from_dev_id(const std::string& dev_id) { - std::lock_guard lock(ledgermtx); - for (size_t i = 0; i < terminal_devlist.size(); ++i) { - if (terminal_devlist[i].terminal_id == dev_id) { - return static_cast(i); - } - } - return -1; // 未找到 -} - -int find_mp_index_from_mp_id(const std::string& mp_id) { - std::lock_guard lock(ledgermtx); - for (const auto& dev : terminal_devlist) { - for (size_t j = 0; j < dev.line.size(); ++j) { - if (dev.line[j].monitor_id == mp_id) { - return static_cast(j); // 返回 line[] 的下标 - } - } - } - return -1; // 未找到 -} - -std::string find_guid_index_from_dev_id(const std::string& dev_id) { - std::lock_guard lock(ledgermtx); - for (size_t i = 0; i < terminal_devlist.size(); ++i) { - if (terminal_devlist[i].terminal_id == dev_id) { - return terminal_devlist[i].guid; - } - } - return ""; // 未找到 -} - /////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理 bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit){ @@ -1489,7 +1455,7 @@ void rocketmq_test_rt(Front* front)//用来测试实时数据 queue_data_t data; data.monitor_no = 123; - data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT; + data.strTopic = G_MQCONSUMER_TOPIC_RT; std::ifstream file("rt.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -1513,7 +1479,7 @@ void rocketmq_test_ud(Front* front)//用来测试台账更新 queue_data_t data; data.monitor_no = 123; - data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD; + data.strTopic = G_MQCONSUMER_TOPIC_UD; std::ifstream file("ud.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -1537,7 +1503,7 @@ void rocketmq_test_set(Front* front)//用来测试进程控制脚本 queue_data_t data; data.monitor_no = 123; - data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET; + data.strTopic = G_MQCONSUMER_TOPIC_SET; std::ifstream file("set.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -1561,7 +1527,7 @@ void rocketmq_test_rc(Front* front)//用来测试补招 queue_data_t data; data.monitor_no = 123; - data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC; + data.strTopic = G_MQCONSUMER_TOPIC_RC; std::ifstream file("rc.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -1871,9 +1837,9 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann: } //心跳和其他响应 -void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type) { +void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type, const std::string& guid, const std::string& mac) { try { - std::string guid = find_guid_index_from_dev_id(dev_id); + /*std::string guid = find_guid_index_from_dev_id(dev_id);*/ if(guid == "") { std::cerr << "dev: " << dev_id << " guid not found" << std::endl; @@ -1887,8 +1853,8 @@ void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type) { obj["Node"] = g_front_seg_index; // Dev_mac:从台账取 addr_str 并规范化 - std::string mac = get_mac_by_devid(dev_id); - obj["Dev_mac"] = mac; + //std::string mac = get_mac_by_devid(dev_id); + obj["Dev_mac"] = normalize_mac(mac); // ---- 构造 Detail ---- nlohmann::json detail; diff --git a/LFtid1056/cloudfront/config/front.cfg b/LFtid1056/cloudfront/config/front.cfg index 8980f68..efcb06a 100644 --- a/LFtid1056/cloudfront/config/front.cfg +++ b/LFtid1056/cloudfront/config/front.cfg @@ -2,18 +2,21 @@ BrokerList= RTDataTopic=Real_Time_Data_Topic -HisTopic=LN_Topic -PSTTopic=LN_Topic -PLTTopic=LN_Topic +HisTopic=njcnAppAutoDataTopic +PSTTopic=njcnAppAutoDataTopic +PLTTopic=njcnAppAutoDataTopic AlmTopic=AlmTopic SngTopic=SngTopic -QUEUE_TAG=Test_Tag -QUEUE_KEY=Test_Keys +QUEUE_TAG=stat +QUEUE_KEY=stat + +RT_TAG=rt +RT_KEY=rt [Flag] -FrontInst=22de160455a30cc2154890e5502d6433 -FrontIP=192.168.1.67 +FrontInst=914b94563ca7f272c90ee8580ed6adc6 +FrontIP=192.168.1.138 [Ledger] TerminalStatus="[0]" @@ -22,35 +25,34 @@ IcdFlag=0 IedCount=300 [Http] -WebDevice=http://192.168.1.67:10202/nodeDevice/nodeDeviceList -WebIcd=http://192.168.1.67:10202/icd/icdPathList -WebEvent=http://192.168.1.67:10203/event/addEventDetail -WebFileupload=http://192.168.1.67:10207/file/upload -WebFiledownload=http://192.168.1.67:10207/file/download +WebDevice=http://192.168.1.103:10220/icd/getLedgerInfo +WebEvent=http://192.168.1.103:10222/event/addCldEvent +WebFileupload=http://192.168.1.103:10207/file/upload +WebFiledownload= [RocketMq] producer=Group_producer -Ipport=192.168.1.24:9876 -AccessKey=rmquser -SecretKey=001njcnmq +Ipport=192.168.1.103:9876 +AccessKey=rmqroot +SecretKey=001@#njcnmq -Topic_Test=LN_Topic +Topic_Test=lnk_Topic Tag_Test=Test_Tag Key_Test=Test_Keys Testflag=1 -Testnum=400 -Testtype=1 +Testnum=0 +Testtype=0 TestPort=11000 -TestList=8ad28e2e36dfbd19906f9e2a4894b375 +TestList= consumer=Group_consumer -ConsumerIpport=192.168.1.24:9876 +ConsumerIpport=192.168.1.103:9876 ConsumerTopicRT=ask_real_data_topic ConsumerTagRT=Test_Tag ConsumerKeyRT=Test_Keys -ConsumerAccessKey=rmquser -ConsumerSecretKey=001njcnmq +ConsumerAccessKey=rmqroot +ConsumerSecretKey=001@#njcnmq ConsumerChannel= ConsumerTopicUD=control_Topic ConsumerTagUD=Test_Tag