modify recall

This commit is contained in:
lnk
2025-10-24 08:50:54 +08:00
parent a755c6faab
commit fdd6a30fe2
5 changed files with 126 additions and 36 deletions

View File

@@ -3390,7 +3390,7 @@ bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info>& FileList
queue_data_list.push_back(std::move(connect_info));
}
// 调试打印
std::cout << "[send_reply_to_cloud] queued: " << j.dump() << std::endl;
std::cout << "[send_file_list] queued: " << j.dump() << std::endl;
//发送后清除guid和标志
if (dev->isbusy > 0) {
@@ -3405,29 +3405,43 @@ bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info>& FileList
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////检查云前置终端的mq业务超时
int get_type_by_state(int state) {
std::string get_type_by_state(int state) {
switch (static_cast<DeviceState>(state)) {
case DeviceState::READING_STATS:
return "读取统计数据"; //读数据
case DeviceState::READING_STATS_TIME:
return "读取统计时间";
case DeviceState::READING_REALSTAT:
return "读取实时数据";
case DeviceState::READING_FIXEDVALUE:
return "读取定值";
case DeviceState::READING_FIXEDVALUEDES:
return "读取定值描述";
case DeviceState::SET_FIXEDVALUE:
return "设置定值";
case DeviceState::READING_INTERFIXEDVALUE:
return "读取内部定值";
case DeviceState::READING_INTERFIXEDVALUEDES:
return "读取内部定值描述";
case DeviceState::READING_CONTROLWORD:
return "读取控制字";
case DeviceState::SET_INTERFIXEDVALUE:
return 0x2106; //读数据
return "设置内部定值";
//return 0x2106; //读数据
case DeviceState::READING_FILEMENU:
return 0x2131; //读目录
return "读取文件目录";
//return 0x2131; //读目录
case DeviceState::READING_EVENTFILE:
return "读取事件文件目录";
//return 0x2133; //读事件文件目录
case DeviceState::READING_FILEDATA:
return 0x2132; //读文件
return "读取文件数据";
//return 0x2132; //读文件
default:
return 0; // 没有对应的type
return "未知业务"; // 没有对应的type
}
}
@@ -3451,7 +3465,9 @@ void check_device_busy_timeout()
<< dev.busytimecount << "s)" << std::endl;
//发送超时响应
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
//send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
send_reply_to_queue(dev.guid, static_cast<int>(ResponseCode::TIMEOUT),
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时,停止该业务处理");
// 超时清空状态
dev.guid.clear(); // 清空进行中的 guid
@@ -3469,7 +3485,9 @@ void check_device_busy_timeout()
<< " 超时(" << dev.busytimecount << "s)" << std::endl;
//发送超时响应
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
//send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
send_reply_to_queue(dev.guid, static_cast<int>(ResponseCode::TIMEOUT),
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时,停止该业务处理");
// 超时清空状态
dev.guid.clear();
@@ -4025,7 +4043,7 @@ void send_reply_to_kafka_recall(const std::string& guid, int dataType,int code,
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = jsonString;
connect_info.tag = Topic_Reply_Tag;
connect_info.tag = "RECALL";
connect_info.key = Topic_Reply_Key;
// 加入发送队列(带互斥锁保护)
@@ -4064,10 +4082,24 @@ void check_recall_event() {
+ " 补招时间范围:" + front.StartTime
+ " ~ " + front.EndTime
+ " 补招执行完成";
send_reply_to_kafka_recall("12345",1,static_cast<int>(ResponseCode::OK),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime);
send_reply_to_kafka_recall(dev.guid,1,static_cast<int>(ResponseCode::OK),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime);
lm.recall_list.pop_front(); // 弹掉首条
} else if (front.recall_status == static_cast<int>(RecallStatus::FAILED)) {
}else if (front.recall_status == static_cast<int>(RecallStatus::EMPTY)) {
std::cout << "[check_recall_event] EMPTY dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " " << front.StartTime << " ~ " << front.EndTime << std::endl;
//调用reply接口通知web端该时间段补招失败
std::string msg = std::string("监测点:") + lm.monitor_name
+ " 补招时间范围:" + front.StartTime
+ " ~ " + front.EndTime
+ " 补招无事件日志";
send_reply_to_kafka_recall(dev.guid,1,static_cast<int>(ResponseCode::NOT_FOUND),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime);
lm.recall_list.pop_front(); // 弹掉首条
}
else if (front.recall_status == static_cast<int>(RecallStatus::FAILED)) {
std::cout << "[check_recall_event] FAILED dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " " << front.StartTime << " ~ " << front.EndTime << std::endl;
@@ -4077,7 +4109,7 @@ void check_recall_event() {
+ " 补招时间范围:" + front.StartTime
+ " ~ " + front.EndTime
+ " 补招执行失败";
send_reply_to_kafka_recall("12345",1,static_cast<int>(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime);
send_reply_to_kafka_recall(dev.guid,1,static_cast<int>(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime);
lm.recall_list.pop_front(); // 弹掉首条
} else {
@@ -4172,7 +4204,7 @@ void check_recall_event() {
t.dev_id, tm1, tm2, 2, mp);//2是暂态事件
std::cout << "[check_recall_event] SEND dev=" << t.dev_id
<< " monitor=" << mp
<< " monitor=" << static_cast<int>(mp)
<< " start=" << t.start_time
<< " end=" << t.end_time
<< " action(2)" << std::endl;
@@ -4714,26 +4746,37 @@ void on_device_response_minimal(int response_code,
// 找到“首条 RUNNING”的补招条目按 rc==200 成功/失败标记
bool updated = false;
// 1) 找到 logical_device_seq 与 cid 匹配的 monitor
// 1) 找到 RecallStatus::RUNNING 的 monitor(优先)
// 若没有,则回退到按 cid -> logical_device_seq 匹配
ledger_monitor* matched_monitor = nullptr;
// 将 cid 转成十进制字符串用于直接比较
const std::string cid_str = std::to_string(static_cast<int>(cid));
// [MOD] 优先:遍历查找首条补招项处于 RUNNING 的监测点
for (auto& lm : dev->line) {
// 直接字符串相等
if (lm.logical_device_seq == cid_str) {
matched_monitor = &lm;
break;
if (!lm.recall_list.empty()) {
const RecallMonitor& head = lm.recall_list.front();
if (head.recall_status == static_cast<int>(RecallStatus::RUNNING)) {
matched_monitor = &lm;
break;
}
}
}
if (!matched_monitor) { //没有匹配的监测点
// [MOD] 回退方案:未找到 RUNNING 的监测点时,按原逻辑用 cid 匹配 logical_device_seq
if (!matched_monitor) {
const std::string cid_str = std::to_string(static_cast<int>(cid));
for (auto& lm : dev->line) {
if (lm.logical_device_seq == cid_str) {
matched_monitor = &lm;
break;
}
}
}
if (!matched_monitor) { // 没有匹配的监测点
std::cout << "[RESP][EVENTLOG][WARN] dev=" << id
<< " cannot find monitor by cid=" << static_cast<int>(cid)
<< " (no logical_device_seq match)" << std::endl;
}
else {
<< " cannot find monitor (no RUNNING item, no cid match), cid="
<< static_cast<int>(cid) << std::endl;
} else {
// 2) 仅更新该监测点 recall_list 的首条,且要求处于 RUNNING
if (!matched_monitor->recall_list.empty()) {
@@ -4744,13 +4787,24 @@ void on_device_response_minimal(int response_code,
std::cout << "[RESP][EVENTLOG][OK] dev=" << id
<< " mon=" << matched_monitor->monitor_id
<< " " << front.StartTime << "~" << front.EndTime
<< " recall_status=" << front.recall_status
<< std::endl;
} else { //补招失败
} else if (response_code == 404) { //补招无数据
front.recall_status = static_cast<int>(RecallStatus::EMPTY);
std::cout << "[RESP][EVENTLOG][WARN] dev=" << id
<< " mon=" << matched_monitor->monitor_id
<< " " << front.StartTime << "~" << front.EndTime
<< " rc=" << response_code << " recall_status=" << front.recall_status << std::endl; //错误响应码
//记录日志
DIY_ERRORLOG_CODE(matched_monitor->monitor_id.c_str(),2,static_cast<int>(LogCode::LOG_CODE_RECALL),"【ERROR】监测点:%s 补招数据失败 - 失败时间点:%lld 至 %lld",matched_monitor->monitor_id.c_str(),front.StartTime,front.EndTime);
}
else { //补招失败
front.recall_status = static_cast<int>(RecallStatus::FAILED);
std::cout << "[RESP][EVENTLOG][FAIL] dev=" << id
<< " mon=" << matched_monitor->monitor_id
<< " " << front.StartTime << "~" << front.EndTime
<< " rc=" << response_code << std::endl; //错误响应码
<< " rc=" << response_code << " recall_status=" << front.recall_status<< std::endl; //错误响应码
//记录日志
DIY_ERRORLOG_CODE(matched_monitor->monitor_id.c_str(),2,static_cast<int>(LogCode::LOG_CODE_RECALL),"【ERROR】监测点:%s 补招数据失败 - 失败时间点:%lld 至 %lld",matched_monitor->monitor_id.c_str(),front.StartTime,front.EndTime);
@@ -4806,7 +4860,9 @@ void on_device_response_minimal(int response_code,
} else {
// 失败响应web并复位为空闲
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
//send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
std::cout << "[RESP][FILEMENU->FILEMENU][WARN] dev=" << id
<< " names missing in cache" << std::endl;
}
@@ -4819,7 +4875,9 @@ void on_device_response_minimal(int response_code,
std::cout << "[RESP][FILEMENU->FILEMENU][OK] dev=" << id << std::endl;
} else {
// 失败响应web并复位为空闲
send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
//send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
dev->guid.clear();
dev->isbusy = 0;
@@ -4911,7 +4969,9 @@ void on_device_response_minimal(int response_code,
// ====== 分支 A当前业务就是“读取文件数据” ======
if (ok) {
// 成功:复位
send_reply_to_cloud(static_cast<int>(ResponseCode::OK), id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
//send_reply_to_cloud(static_cast<int>(ResponseCode::OK), id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"成功,停止该业务处理");
dev->guid.clear();
dev->isbusy = 0;
@@ -4920,7 +4980,9 @@ void on_device_response_minimal(int response_code,
std::cout << "[RESP][FILEDATA->FILEDATA][OK] dev=" << id << std::endl;
} else {
// 失败响应web并复位
send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
//send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
dev->guid.clear();
dev->isbusy = 0;
@@ -5002,7 +5064,9 @@ void on_device_response_minimal(int response_code,
}
if (dev) {
//直接根据输入响应mq
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
//send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理");
//其他的错误和成功都会结束业务
dev->guid.clear(); // 清空 guid
dev->busytype = 0; // 业务类型归零

View File

@@ -44,7 +44,7 @@ public:
class RecallMonitor
{
public:
int recall_status; //补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
int recall_status; //补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败 4-无数据
std::string StartTime; //数据补招起始时间
std::string EndTime; //数据补招结束时间
std::string STEADY; //补招历史统计数据标识 0-不补招1-补招
@@ -128,7 +128,8 @@ enum class RecallStatus {
NOT_STARTED = 0, // 未补招
RUNNING = 1, // 补招中
DONE = 2, // 补招完成
FAILED = 3 // 补招失败
FAILED = 3, // 补招失败
EMPTY = 4 // 无补招数据
};
// 本轮要下发的一条任务(每个终端最多一条)
@@ -896,6 +897,24 @@ enum class ResponseCode : int {
TIMEOUT = 406, // 请求超出了等待时间
INTERNAL_ERROR = 500 // 其他错误
};
inline std::string ResponseCodeToString(int code) {
switch (code) {
case 200: return "请求成功";
case 201: return "请求被接受,开始处理";
case 202: return "请求处理中";
case 400: return "请求失败,参数错误";
case 401: return "未认证或认证错误";
case 402: return "请求被拒绝:正在处理同类命令";
case 403: return "请求被拒绝";
case 404: return "资源不存在";
case 405: return "当前忙,无法响应";
case 406: return "请求超时";
case 500: return "内部错误";
default: return "未知响应码";
}
}
static inline bool is_ok(int rc) { return rc == static_cast<int>(ResponseCode::OK); }
static bool parse_datetime_tm(const std::string& s, std::tm& out) {

View File

@@ -1593,7 +1593,7 @@ void send_batch_reply_to_queue(const std::string& guid,
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = root.dump();
connect_info.tag = Topic_Reply_Tag;
connect_info.tag = "LEDGER";
connect_info.key = Topic_Reply_Key;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);

View File

@@ -622,6 +622,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
case 1: return "RUNNING(1)";
case 2: return "DONE(2)";
case 3: return "FAILED(3)";
case 4: return "EMPTY(4)";
default: return "UNKNOWN";
}
};