From fdd6a30fe23a8576ca377e9bbd63b51cf1833d03 Mon Sep 17 00:00:00 2001 From: lnk Date: Fri, 24 Oct 2025 08:50:54 +0800 Subject: [PATCH] modify recall --- LFtid1056/cloudfront/code/cfg_parser.cpp | 130 +++++++++++++++++------ LFtid1056/cloudfront/code/interface.h | 23 +++- LFtid1056/cloudfront/code/rocketmq.cpp | 2 +- LFtid1056/cloudfront/code/worker.cpp | 1 + LFtid1056/dealMsg.cpp | 6 ++ 5 files changed, 126 insertions(+), 36 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index e5af763..24f6322 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -3390,7 +3390,7 @@ bool send_file_list(terminal_dev* dev, const std::vector& FileList queue_data_list.push_back(std::move(connect_info)); } // 调试打印 - std::cout << "[send_reply_to_cloud] queued: " << j.dump() << std::endl; + std::cout << "[send_file_list] queued: " << j.dump() << std::endl; //发送后清除guid和标志 if (dev->isbusy > 0) { @@ -3405,29 +3405,43 @@ bool send_file_list(terminal_dev* dev, const std::vector& FileList } /////////////////////////////////////////////////////////////////////////////////////////////////////////////检查云前置终端的mq业务超时 -int get_type_by_state(int state) { +std::string get_type_by_state(int state) { switch (static_cast(state)) { case DeviceState::READING_STATS: + return "读取统计数据"; //读数据 case DeviceState::READING_STATS_TIME: + return "读取统计时间"; case DeviceState::READING_REALSTAT: + return "读取实时数据"; case DeviceState::READING_FIXEDVALUE: + return "读取定值"; case DeviceState::READING_FIXEDVALUEDES: + return "读取定值描述"; case DeviceState::SET_FIXEDVALUE: + return "设置定值"; case DeviceState::READING_INTERFIXEDVALUE: + return "读取内部定值"; case DeviceState::READING_INTERFIXEDVALUEDES: + return "读取内部定值描述"; case DeviceState::READING_CONTROLWORD: + return "读取控制字"; case DeviceState::SET_INTERFIXEDVALUE: - return 0x2106; //读数据 + return "设置内部定值"; + //return 0x2106; //读数据 case DeviceState::READING_FILEMENU: - return 0x2131; //读目录 + return "读取文件目录"; + //return 0x2131; //读目录 case DeviceState::READING_EVENTFILE: + return "读取事件文件目录"; + //return 0x2133; //读事件文件目录 case DeviceState::READING_FILEDATA: - return 0x2132; //读文件 + return "读取文件数据"; + //return 0x2132; //读文件 default: - return 0; // 没有对应的type + return "未知业务"; // 没有对应的type } } @@ -3451,7 +3465,9 @@ void check_device_busy_timeout() << dev.busytimecount << "s)" << std::endl; //发送超时响应 - send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac); + //send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac); + send_reply_to_queue(dev.guid, static_cast(ResponseCode::TIMEOUT), + "终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时,停止该业务处理"); // 超时清空状态 dev.guid.clear(); // 清空进行中的 guid @@ -3469,7 +3485,9 @@ void check_device_busy_timeout() << " 超时(" << dev.busytimecount << "s)" << std::endl; //发送超时响应 - send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac); + //send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac); + send_reply_to_queue(dev.guid, static_cast(ResponseCode::TIMEOUT), + "终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时,停止该业务处理"); // 超时清空状态 dev.guid.clear(); @@ -4025,7 +4043,7 @@ void send_reply_to_kafka_recall(const std::string& guid, int dataType,int code, queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = jsonString; - connect_info.tag = Topic_Reply_Tag; + connect_info.tag = "RECALL"; connect_info.key = Topic_Reply_Key; // 加入发送队列(带互斥锁保护) @@ -4064,10 +4082,24 @@ void check_recall_event() { + " 补招时间范围:" + front.StartTime + " ~ " + front.EndTime + " 补招执行完成"; - send_reply_to_kafka_recall("12345",1,static_cast(ResponseCode::OK),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime); + send_reply_to_kafka_recall(dev.guid,1,static_cast(ResponseCode::OK),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime); lm.recall_list.pop_front(); // 弹掉首条 - } else if (front.recall_status == static_cast(RecallStatus::FAILED)) { + }else if (front.recall_status == static_cast(RecallStatus::EMPTY)) { + std::cout << "[check_recall_event] EMPTY dev=" << dev.terminal_id + << " monitor=" << lm.monitor_id + << " " << front.StartTime << " ~ " << front.EndTime << std::endl; + + //调用reply接口通知web端该时间段补招失败 + std::string msg = std::string("监测点:") + lm.monitor_name + + " 补招时间范围:" + front.StartTime + + " ~ " + front.EndTime + + " 补招无事件日志"; + send_reply_to_kafka_recall(dev.guid,1,static_cast(ResponseCode::NOT_FOUND),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime); + + lm.recall_list.pop_front(); // 弹掉首条 + } + else if (front.recall_status == static_cast(RecallStatus::FAILED)) { std::cout << "[check_recall_event] FAILED dev=" << dev.terminal_id << " monitor=" << lm.monitor_id << " " << front.StartTime << " ~ " << front.EndTime << std::endl; @@ -4077,7 +4109,7 @@ void check_recall_event() { + " 补招时间范围:" + front.StartTime + " ~ " + front.EndTime + " 补招执行失败"; - send_reply_to_kafka_recall("12345",1,static_cast(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime); + send_reply_to_kafka_recall(dev.guid,1,static_cast(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime); lm.recall_list.pop_front(); // 弹掉首条 } else { @@ -4172,7 +4204,7 @@ void check_recall_event() { t.dev_id, tm1, tm2, 2, mp);//2是暂态事件 std::cout << "[check_recall_event] SEND dev=" << t.dev_id - << " monitor=" << mp + << " monitor=" << static_cast(mp) << " start=" << t.start_time << " end=" << t.end_time << " action(2)" << std::endl; @@ -4714,26 +4746,37 @@ void on_device_response_minimal(int response_code, // 找到“首条 RUNNING”的补招条目,按 rc==200 成功/失败标记 bool updated = false; - // 1) 找到 logical_device_seq 与 cid 匹配的 monitor + // 1) 找到 RecallStatus::RUNNING 的 monitor(优先) + // 若没有,则回退到按 cid -> logical_device_seq 匹配 ledger_monitor* matched_monitor = nullptr; - // 将 cid 转成十进制字符串用于直接比较 - const std::string cid_str = std::to_string(static_cast(cid)); - + // [MOD] 优先:遍历查找首条补招项处于 RUNNING 的监测点 for (auto& lm : dev->line) { - // 直接字符串相等 - if (lm.logical_device_seq == cid_str) { - matched_monitor = &lm; - break; + if (!lm.recall_list.empty()) { + const RecallMonitor& head = lm.recall_list.front(); + if (head.recall_status == static_cast(RecallStatus::RUNNING)) { + matched_monitor = &lm; + break; + } } } - if (!matched_monitor) { //没有匹配的监测点 + // [MOD] 回退方案:未找到 RUNNING 的监测点时,按原逻辑用 cid 匹配 logical_device_seq + if (!matched_monitor) { + const std::string cid_str = std::to_string(static_cast(cid)); + for (auto& lm : dev->line) { + if (lm.logical_device_seq == cid_str) { + matched_monitor = &lm; + break; + } + } + } + + if (!matched_monitor) { // 没有匹配的监测点 std::cout << "[RESP][EVENTLOG][WARN] dev=" << id - << " cannot find monitor by cid=" << static_cast(cid) - << " (no logical_device_seq match)" << std::endl; - } - else { + << " cannot find monitor (no RUNNING item, no cid match), cid=" + << static_cast(cid) << std::endl; + } else { // 2) 仅更新该监测点 recall_list 的首条,且要求处于 RUNNING if (!matched_monitor->recall_list.empty()) { @@ -4744,13 +4787,24 @@ void on_device_response_minimal(int response_code, std::cout << "[RESP][EVENTLOG][OK] dev=" << id << " mon=" << matched_monitor->monitor_id << " " << front.StartTime << "~" << front.EndTime + << " recall_status=" << front.recall_status << std::endl; - } else { //补招失败 + } else if (response_code == 404) { //补招无数据 + front.recall_status = static_cast(RecallStatus::EMPTY); + std::cout << "[RESP][EVENTLOG][WARN] dev=" << id + << " mon=" << matched_monitor->monitor_id + << " " << front.StartTime << "~" << front.EndTime + << " rc=" << response_code << " recall_status=" << front.recall_status << std::endl; //错误响应码 + + //记录日志 + DIY_ERRORLOG_CODE(matched_monitor->monitor_id.c_str(),2,static_cast(LogCode::LOG_CODE_RECALL),"【ERROR】监测点:%s 补招数据失败 - 失败时间点:%lld 至 %lld",matched_monitor->monitor_id.c_str(),front.StartTime,front.EndTime); + } + else { //补招失败 front.recall_status = static_cast(RecallStatus::FAILED); std::cout << "[RESP][EVENTLOG][FAIL] dev=" << id << " mon=" << matched_monitor->monitor_id << " " << front.StartTime << "~" << front.EndTime - << " rc=" << response_code << std::endl; //错误响应码 + << " rc=" << response_code << " recall_status=" << front.recall_status<< std::endl; //错误响应码 //记录日志 DIY_ERRORLOG_CODE(matched_monitor->monitor_id.c_str(),2,static_cast(LogCode::LOG_CODE_RECALL),"【ERROR】监测点:%s 补招数据失败 - 失败时间点:%lld 至 %lld",matched_monitor->monitor_id.c_str(),front.StartTime,front.EndTime); @@ -4806,7 +4860,9 @@ void on_device_response_minimal(int response_code, } else { // 失败:响应web并复位为空闲 - send_reply_to_cloud(static_cast(ResponseCode::BAD_REQUEST), id, static_cast(DeviceState::READING_FILEMENU),dev->guid,dev->mac); + //send_reply_to_cloud(static_cast(ResponseCode::BAD_REQUEST), id, static_cast(DeviceState::READING_FILEMENU),dev->guid,dev->mac); + send_reply_to_queue(dev->guid, static_cast(ResponseCode::BAD_REQUEST), + "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理"); std::cout << "[RESP][FILEMENU->FILEMENU][WARN] dev=" << id << " names missing in cache" << std::endl; } @@ -4819,7 +4875,9 @@ void on_device_response_minimal(int response_code, std::cout << "[RESP][FILEMENU->FILEMENU][OK] dev=" << id << std::endl; } else { // 失败:响应web并复位为空闲 - send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEMENU),dev->guid,dev->mac); + //send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEMENU),dev->guid,dev->mac); + send_reply_to_queue(dev->guid, static_cast(ResponseCode::BAD_REQUEST), + "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理"); dev->guid.clear(); dev->isbusy = 0; @@ -4911,7 +4969,9 @@ void on_device_response_minimal(int response_code, // ====== 分支 A:当前业务就是“读取文件数据” ====== if (ok) { // 成功:复位 - send_reply_to_cloud(static_cast(ResponseCode::OK), id, static_cast(DeviceState::READING_FILEDATA),dev->guid,dev->mac); + //send_reply_to_cloud(static_cast(ResponseCode::OK), id, static_cast(DeviceState::READING_FILEDATA),dev->guid,dev->mac); + send_reply_to_queue(dev->guid, static_cast(ResponseCode::OK), + "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"成功,停止该业务处理"); dev->guid.clear(); dev->isbusy = 0; @@ -4920,7 +4980,9 @@ void on_device_response_minimal(int response_code, std::cout << "[RESP][FILEDATA->FILEDATA][OK] dev=" << id << std::endl; } else { // 失败:响应web并复位 - send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEDATA),dev->guid,dev->mac); + //send_reply_to_cloud(response_code, id, static_cast(DeviceState::READING_FILEDATA),dev->guid,dev->mac); + send_reply_to_queue(dev->guid, static_cast(ResponseCode::BAD_REQUEST), + "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理"); dev->guid.clear(); dev->isbusy = 0; @@ -5002,7 +5064,9 @@ void on_device_response_minimal(int response_code, } if (dev) { //直接根据输入响应mq - send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); + //send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); + send_reply_to_queue(dev->guid, response_code, + "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理"); //其他的错误和成功都会结束业务 dev->guid.clear(); // 清空 guid dev->busytype = 0; // 业务类型归零 diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 0eb34f9..9aacb74 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -44,7 +44,7 @@ public: class RecallMonitor { public: - int recall_status; //补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败 + int recall_status; //补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败 4-无数据 std::string StartTime; //数据补招起始时间 std::string EndTime; //数据补招结束时间 std::string STEADY; //补招历史统计数据标识 0-不补招;1-补招 @@ -128,7 +128,8 @@ enum class RecallStatus { NOT_STARTED = 0, // 未补招 RUNNING = 1, // 补招中 DONE = 2, // 补招完成 - FAILED = 3 // 补招失败 + FAILED = 3, // 补招失败 + EMPTY = 4 // 无补招数据 }; // 本轮要下发的一条任务(每个终端最多一条) @@ -896,6 +897,24 @@ enum class ResponseCode : int { TIMEOUT = 406, // 请求超出了等待时间 INTERNAL_ERROR = 500 // 其他错误 }; + +inline std::string ResponseCodeToString(int code) { + switch (code) { + case 200: return "请求成功"; + case 201: return "请求被接受,开始处理"; + case 202: return "请求处理中"; + case 400: return "请求失败,参数错误"; + case 401: return "未认证或认证错误"; + case 402: return "请求被拒绝:正在处理同类命令"; + case 403: return "请求被拒绝"; + case 404: return "资源不存在"; + case 405: return "当前忙,无法响应"; + case 406: return "请求超时"; + case 500: return "内部错误"; + default: return "未知响应码"; + } +} + static inline bool is_ok(int rc) { return rc == static_cast(ResponseCode::OK); } static bool parse_datetime_tm(const std::string& s, std::tm& out) { diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 25383c8..be5ff3e 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -1593,7 +1593,7 @@ void send_batch_reply_to_queue(const std::string& guid, queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = root.dump(); - connect_info.tag = Topic_Reply_Tag; + connect_info.tag = "LEDGER"; connect_info.key = Topic_Reply_Key; std::lock_guard lock(queue_data_list_mutex); diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 7c9b610..88152af 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -622,6 +622,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { case 1: return "RUNNING(1)"; case 2: return "DONE(2)"; case 3: return "FAILED(3)"; + case 4: return "EMPTY(4)"; default: return "UNKNOWN"; } }; diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index a0bcd93..33b5628 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -2134,12 +2134,18 @@ void process_received_message(string mac, string id,const char* data, size_t len if (udata[10] == static_cast(0x0c)) { //0x0c错误码代表询问暂态数据不存在,通知前台当前时间段无暂态 std::cout << "not find event " << mac << std::endl; + //lnk20251023 + on_device_response_minimal(static_cast(ResponseCode::NOT_FOUND), id, 0, static_cast(DeviceState::READING_EVENTLOG)); } else if (udata[10] == static_cast(0x06)) { //0x0c错误码代表补招方法参数错误,通知前台参数异常 + //lnk20251023 + on_device_response_minimal(static_cast(ResponseCode::FORBIDDEN), id, 0, static_cast(DeviceState::READING_EVENTLOG)); } else { //其余错误码代表异常情况 + //lnk20251023 + on_device_response_minimal(static_cast(ResponseCode::BAD_REQUEST), id, 0, static_cast(DeviceState::CUSTOM_ACTION)); } // 装置否定 // 补招装置日志失败,调整为空闲状态,处理下一项工作。