Merge branch '测试2' of http://192.168.1.22:3000/zhangwen/front_linux into 测试2

This commit is contained in:
2025-12-02 10:53:52 +08:00
2 changed files with 192 additions and 34 deletions

Binary file not shown.

View File

@@ -1333,8 +1333,8 @@ int recall_json_handle_from_mq(const std::string& body)
}
if (dt == 2) { //一个测点一个guid对应多个文件
// ▲直下文件timeList -> fun1/fun2 -> enqueue_direct_download
if (!rec.contains("timeList") || !rec["timeList"].is_array()) continue;
// ▲直下文件timeInterval -> fun1/fun2 -> enqueue_direct_download
if (!rec.contains("timeInterval") || !rec["timeInterval"].is_array()) continue;
for (const auto& monId : monitors) {
// fun1提取 monitor 数字
@@ -1345,7 +1345,7 @@ int recall_json_handle_from_mq(const std::string& body)
init_recall_record_file(guid, terminalId, monId, "", "");
//根据时间戳单独补招事件
// ★新增dt==2 同步生成“按小时”的事件补招到 recall_list与 dt==1 逻辑一致)——开始
if(0)// ★新增dt==2 同步生成“按小时”的事件补招到 recall_list与 dt==1 逻辑一致)——开始
{
std::lock_guard<std::mutex> lock2(ledgermtx); // 复用与 dt==1 相同的加锁粒度
// 找终端
@@ -1361,7 +1361,7 @@ int recall_json_handle_from_mq(const std::string& body)
if (!lm) { std::cout << "monitorId未在terminal内找到(直下事件回灌): " << monId << " @ " << terminalId << std::endl; continue; }
// 将 timeList 的每个时间点映射为 [该整点, 下一整点) 的一小时窗口
for (const auto& t : rec["timeList"]) {
for (const auto& t : rec["timeInterval"]) {
if (!t.is_string()) continue;
std::string tstr = t.get<std::string>();
@@ -1387,7 +1387,7 @@ int recall_json_handle_from_mq(const std::string& body)
// ★新增dt==2 同步生成“按小时”的事件补招到 recall_list与 dt==1 逻辑一致)——结束
//根据时间戳单独补招事件
for (const auto& t : rec["timeList"]) {
for (const auto& t : rec["timeInterval"]) {
if (!t.is_string()) continue;
std::string ts_compact = compact_ts_for_filename(t.get<std::string>());
@@ -3561,7 +3561,7 @@ std::string get_type_by_state(int state) {
return "补招事件日志";
case static_cast<int>(DeviceState::READING_STATSFILE):
return "补招稳态数据文件";
return "补招文件";
case static_cast<int>(DeviceState::CUSTOM_ACTION):
return "自定义动作";
@@ -3585,7 +3585,7 @@ void check_device_busy_timeout()
if (dev.busytype == static_cast<int>(DeviceState::READING_FILEDATA) || dev.busytype == static_cast<int>(DeviceState::READING_STATSFILE)) //下载文件业务
{
if (dev.busytimecount > 60)
if (dev.busytimecount > 600)
{
std::cout << "[Timeout] Device " << dev.terminal_id
<< " busytype=READING_FILEDATA 超时("
@@ -3607,7 +3607,7 @@ void check_device_busy_timeout()
//发送超时响应
//send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
send_reply_to_queue(dev.guid, static_cast<int>(ResponseCode::TIMEOUT),
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时,停止该业务处理");
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时600秒,停止该业务处理");
// 超时清空状态
//若还有未处理条目,则仅复位计时,不清设备状态,交给 check_recall_event() 弹掉 FAILED 并继续下一条
@@ -3633,7 +3633,7 @@ void check_device_busy_timeout()
}
else //其他业务包括补招日志都是20s一条一问一答的时间
{
if (dev.busytimecount > 20)
if (dev.busytimecount > 30)
{
std::cout << "[Timeout] Device " << dev.terminal_id
<< " busytype=" << dev.busytype
@@ -3655,7 +3655,7 @@ void check_device_busy_timeout()
//发送超时响应
//send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
send_reply_to_queue(dev.guid, static_cast<int>(ResponseCode::TIMEOUT),
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时,停止该业务处理");
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时30秒,停止该业务处理");
// 超时清空状态
//若还有未处理条目,则仅复位计时,不清设备状态,交给 check_recall_event() 弹掉 FAILED 并继续下一条
@@ -4284,19 +4284,19 @@ void check_recall_event() {
bool allow_check =
dev.busytype == static_cast<int>(DeviceState::IDLE) ||
dev.busytype == static_cast<int>(DeviceState::READING_EVENTLOG);
// 如果你允许“事件抢占文件”,并且当前没有文件 RUNNING则也允许评估事件
if (!allow_check && dev.busytype == static_cast<int>(DeviceState::READING_STATSFILE)) {
bool has_file_running = false;
for (const auto& lm2 : dev.line) {
if (!lm2.recall_list_static.empty() &&
lm2.recall_list_static.front().recall_status == static_cast<int>(RecallStatus::RUNNING)) {
lm2.recall_list_static.front().recall_status != static_cast<int>(RecallStatus::NOT_STARTED)) {
has_file_running = true; break;
}
}
if (!has_file_running) allow_check = true; // 没有文件正在跑,允许事件优先
}
if (!allow_check) {
std::cout << "[check_recall_event] skip dev=" << dev.terminal_id
<< " busytype=" << dev.busytype << std::endl;
@@ -4684,9 +4684,43 @@ static bool make_target_key_from_filename(const std::string& fname, std::string&
out_key.append(monitor).append("_").append(ymd).append("_").append(hms);
return true;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件匹配事件
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件新增优化20251105
// ★新增设备级目录缓存dev_id -> { dir -> files }
static std::mutex g_device_dir_cache_mtx;
static std::map<std::string, std::map<std::string, std::vector<tag_dir_info>>> g_device_dir_cache;
// ★新增:尝试从缓存获取目录文件列表;命中返回 true即便是空列表
static bool dircache_try_get(const std::string& dev_id,
const std::string& dir,
std::vector<tag_dir_info>& out)
{
std::lock_guard<std::mutex> lk(g_device_dir_cache_mtx);
auto it_dev = g_device_dir_cache.find(dev_id);
if (it_dev == g_device_dir_cache.end()) return false;
auto it_dir = it_dev->second.find(dir);
if (it_dir == it_dev->second.end()) return false;
out = it_dir->second; // 可能为空,但表示“已问过/已缓存”
return true;
}
// ★新增:把目录文件列表写入缓存(空列表也写入,代表“已询问且为空”)
static void dircache_store(const std::string& dev_id,
const std::string& dir,
const std::vector<tag_dir_info>& files)
{
std::lock_guard<std::mutex> lk(g_device_dir_cache_mtx);
g_device_dir_cache[dev_id][dir] = files; // 覆盖式记录
}
// ★新增:清空某个装置的目录缓存(当该装置所有补招文件记录都处理完毕时调用)
static void dircache_clear_device(const std::string& dev_id)
{
std::lock_guard<std::mutex> lk(g_device_dir_cache_mtx);
g_device_dir_cache.erase(dev_id);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////补招文件逻辑
// ====== ★修改check_recall_stat —— 加入“两步法”状态机 ======
void check_recall_file() {
@@ -4698,6 +4732,7 @@ void check_recall_file() {
unsigned short logical_seq = 0;
std::vector<std::string> files_to_send; // 完整路径含MAC目录
qvvr_data matched{}; // 与 .cfg 匹配到的事件(如有)
qvvr_data mismatched{}; // 未匹配到的事件(如有)
bool has_matched = false;
// 用于回锁后在台账中定位并删掉对应 qvvr_file 组
std::set<std::string> sig_names; // 仅文件名集合
@@ -4883,7 +4918,9 @@ void check_recall_file() {
}
}
if (had_items_before && popped_any && lm.recall_list_static.empty()) {//处理后,当前测点补招记录为空
if (had_items_before && popped_any &&
(lm.recall_list_static.empty() ||
(!lm.recall_list_static.empty() && lm.recall_list_static.front().recall_guid != dev.guid))) {//处理后,当前测点补招记录为空
//通知补招全部完成
std::cout << "[check_recall_file] finish recall monitor=" << lm.monitor_id << std::endl;
//读取记录文件获取响应参数
@@ -4921,6 +4958,9 @@ void check_recall_file() {
dev.busytype = static_cast<int>(DeviceState::IDLE);
dev.isbusy = 0;
dev.busytimecount = 0;
dircache_clear_device(dev.terminal_id); // ★新增:该装置的目录缓存生命周期到此结束,清空
continue;//这个装置处理下一个装置
}
@@ -4958,11 +4998,26 @@ void check_recall_file() {
// 立即发起第一个目录请求
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index];//从第一个目录开始
// ★★ 只发一个目录请求,并等待外部线程回写结果与文件名列表
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);
std::cout << "[check_recall_stat] LIST req dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << std::endl;
// ★新增:先查设备级目录缓存
std::vector<tag_dir_info> cached;
if (dircache_try_get(dev.terminal_id, front.cur_dir, cached)) {
// 命中缓存:直接当作“目录 OK”不下发请求
front.dir_files[front.cur_dir] = std::move(cached);
front.list_result = ActionResult::OK;
std::cout << "[dircache] HIT dev=" << dev.terminal_id
<< " dir=" << front.cur_dir
<< " entries=" << front.dir_files[front.cur_dir].size() << std::endl;
break;
} else {
// ★★ 只发一个目录请求,并等待外部线程回写结果与文件名列表
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);
std::cout << "[check_recall_stat] LIST req dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << std::endl;
break;
}
} else {
// 无目录可查
front.recall_status = static_cast<int>(RecallStatus::FAILED);
@@ -5005,10 +5060,24 @@ void check_recall_file() {
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index];
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);
std::cout << "[check_recall_stat] LIST retry dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << std::endl;
// ★新增:先查缓存
std::vector<tag_dir_info> cached;
if (dircache_try_get(dev.terminal_id, front.cur_dir, cached)) {
front.dir_files[front.cur_dir] = std::move(cached);
front.list_result = ActionResult::OK;
std::cout << "[dircache] HIT dev=" << dev.terminal_id
<< " dir=" << front.cur_dir
<< " entries=" << front.dir_files[front.cur_dir].size() << std::endl;
break;
} else {
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);
std::cout << "[check_recall_stat] LIST retry dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << std::endl;
break;
}
} else {
// 所有目录都失败
front.recall_status = static_cast<int>(RecallStatus::FAILED);
@@ -5125,10 +5194,24 @@ void check_recall_file() {
front.list_result = ActionResult::PENDING;
if (front.cur_dir_index < static_cast<int>(front.dir_candidates.size())) {
front.cur_dir = front.dir_candidates[front.cur_dir_index];
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);
std::cout << "[check_recall_stat] LIST next dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << std::endl;
// ★新增:先查缓存
std::vector<tag_dir_info> cached;
if (dircache_try_get(dev.terminal_id, front.cur_dir, cached)) {
front.dir_files[front.cur_dir] = std::move(cached);
front.list_result = ActionResult::OK;
std::cout << "[dircache] HIT dev=" << dev.terminal_id
<< " dir=" << front.cur_dir
<< " entries=" << front.dir_files[front.cur_dir].size() << std::endl;
break;
} else {
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);
std::cout << "[check_recall_stat] LIST next dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir << std::endl;
break;
}
} else {
// 所有目录都“无匹配文件”
front.recall_status = static_cast<int>(RecallStatus::FAILED);
@@ -5236,6 +5319,7 @@ void check_recall_file() {
// 寻找第一个 .cfg 做匹配
qvvr_data matched{};
qvvr_data mismatched{};
bool has_matched = false;
for (const auto& p : it_qf->file_download) {
auto s = sanitize(p);
@@ -5248,6 +5332,22 @@ void check_recall_file() {
it_qf->is_pair = true;
has_matched = true;
}
else {
it_qf->is_pair = false;
long long start_t;
long long trigger_t;
if (extract_timestamp_from_cfg_file(p, start_t, trigger_t)) {
mismatched.QVVR_time = trigger_t;
} else {
mismatched.QVVR_time = 0; // 未能提取时间
}
// 其他字段置空/置零
mismatched.QVVR_Amg = 0.0;
mismatched.QVVR_PerTime = 0.0;
mismatched.QVVR_type = 0;
mismatched.phase = 0;
}
break; // 只看一个 .cfg
}
}
@@ -5263,6 +5363,7 @@ void check_recall_file() {
pu.files_to_send.assign(it_qf->file_download.begin(), it_qf->file_download.end()); // ★修复list -> vector
pu.has_matched = has_matched;
pu.matched = matched;
pu.mismatched = mismatched;
pu.sig_names = want_names;
pu.sig_downs = std::set<std::string>(pu.files_to_send.begin(), pu.files_to_send.end());
pending_uploads.push_back(std::move(pu));
@@ -5369,12 +5470,29 @@ void check_recall_file() {
front.cur_dir_index = 0; //正在尝试的目录下标
front.cur_dir = front.dir_candidates[0]; //第一个目录
front.list_result = ActionResult::PENDING; //目录状态:等待返回
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);//调用目录请求接口
std::cout << "[check_recall_stat] LIST start dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir
<< " start=" << front.StartTime
<< " end=" << front.EndTime << std::endl;
// ★新增:先查缓存
std::vector<tag_dir_info> cached;
if (dircache_try_get(dev.terminal_id, front.cur_dir, cached)) {
front.dir_files[front.cur_dir] = std::move(cached);
front.list_result = ActionResult::OK;
std::cout << "[dircache] HIT dev=" << dev.terminal_id
<< " dir=" << front.cur_dir
<< " entries=" << front.dir_files[front.cur_dir].size() << std::endl;
break; // ★新增:命中缓存后,本轮就此结束(每终端本轮仅推进一条)
} else {
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);//调用目录请求接口
std::cout << "[check_recall_stat] LIST start dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir
<< " start=" << front.StartTime
<< " end=" << front.EndTime << std::endl;
break; // ★新增:已下发请求,本轮结束,等回执/下一轮
}
} else {
front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败
@@ -5424,6 +5542,22 @@ void check_recall_file() {
pu.matched.phase,
wavepath);
}
else{
//打印提示:没有从装置读到匹配的事件,使用文件中的数据进行更新
std::cout << "[Upload] No matched QVVR event from device for terminal="
<< pu.terminal_id << " seq=" << pu.logical_seq
<< ", using data extracted from .cfg file for JSON update.\n";
transfer_json_qvvr_data(pu.terminal_id,
pu.logical_seq,
//从文件中获取
pu.mismatched.QVVR_Amg,
pu.mismatched.QVVR_PerTime,
pu.mismatched.QVVR_time,
pu.mismatched.QVVR_type,
pu.mismatched.phase,
wavepath);
}
// 3) 删除本地文件
for (const auto& f : pu.files_to_send) {
@@ -5771,6 +5905,9 @@ void on_device_response_minimal(int response_code,
if(filemenu_cache_take(id, names)) { // 从缓存取目录列表
running_front->dir_files[running_front->cur_dir] = std::move(names);
running_front->list_result = ActionResult::OK;
dircache_store(id, running_front->cur_dir, running_front->dir_files[running_front->cur_dir]); // ★新增:缓存(非空或空都按实际存)
std::cout << "[RESP][FILEMENU->(EVENT/STATS)FILE][OK] dev=" << id
<< " monitor=" << running_monitor->monitor_id
<< " dir=" << running_front->cur_dir
@@ -5779,6 +5916,9 @@ void on_device_response_minimal(int response_code,
} else {
running_front->dir_files[running_front->cur_dir] = {};
running_front->list_result = ActionResult::OK;
dircache_store(id, running_front->cur_dir, running_front->dir_files[running_front->cur_dir]); // ★新增:缓存空列表
std::cout << "[RESP][FILEMENU->(EVENT/STATS)FILE][WARN] dev=" << id
<< " monitor=" << running_monitor->monitor_id
<< " dir=" << running_front->cur_dir
@@ -6611,6 +6751,24 @@ bool get_recall_record_fields_by_guid_monitor(const std::string& guid,
return false;
}
//将 msg 内的换行符替换为逗号,避免 JSON 解析出错
{
for (auto& ch : msg) {
if (ch == '\n' || ch == '\r') {
ch = ','; // 统一替换为逗号
}
}
// 连续逗号去重(例如 "\n\n" -> ",," -> ",")
while (msg.find(",,") != std::string::npos) {
msg.replace(msg.find(",,"), 2, ",");
}
// 去除首尾多余的逗号
if (!msg.empty() && msg.front() == ',') msg.erase(msg.begin());
if (!msg.empty() && msg.back() == ',') msg.pop_back();
}
std::cout << "[recall_file] 读取成功: " << filepath << std::endl;
return true;
}