add interface

This commit is contained in:
lnk
2026-04-01 15:30:52 +08:00
parent 15cbbd1c24
commit b11105f91c
5 changed files with 485 additions and 169 deletions

View File

@@ -81,6 +81,16 @@ std::map<std::pair<std::string,std::string>, std::string> g_recall_file_index;
std::mutex g_last_ts_mtx;
std::unordered_map<std::string, time_t> g_last_ts_by_devid;
//文件下载缓存
struct FileDownloadReplyInfo
{
std::string local_name; // 本地名,对应 JSON 的 Name
std::string remote_name; // 远端名,对应 JSON 的 RemoteName
};
static std::mutex g_filedownload_cache_mtx;
static std::map<std::string, FileDownloadReplyInfo> g_filedownload_cache;
//目录信息缓存
static std::mutex g_filemenu_cache_mtx;
std::map<std::string, std::vector<tag_dir_info>> g_filemenu_cache;
@@ -3568,7 +3578,7 @@ void check_device_busy_timeout()
}
//发送超时响应
//send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,dev.busytype,dev.guid,dev.mac);
send_reply_to_queue(dev.guid, static_cast<int>(ResponseCode::TIMEOUT),
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时600秒停止该业务处理");
@@ -3616,7 +3626,7 @@ void check_device_busy_timeout()
}
//发送超时响应
//send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype),dev.guid,dev.mac);
send_reply_to_cloud(static_cast<int>(ResponseCode::TIMEOUT),dev.terminal_id,dev.busytype,dev.guid,dev.mac);
send_reply_to_queue(dev.guid, static_cast<int>(ResponseCode::TIMEOUT),
"终端 id: " + dev.terminal_id + "进行业务:" + get_type_by_state(dev.busytype) +"超时30秒停止该业务处理");
@@ -3937,7 +3947,7 @@ bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, con
// Msg
nlohmann::json msg;
msg["Cldid"] = mp_index; //测点序号
msg["DataType"] = 0x0C; //定值
msg["DataType"] = 1; //定值
// DataArray对象数组逐个填充DZ_Value 严格按 set_values 顺序
nlohmann::json dataArray = nlohmann::json::array();
@@ -3975,10 +3985,10 @@ bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, con
// 6) 入队发送
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strTopic = Cloud_Reply_Topic;
connect_info.strText = j.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
connect_info.tag = Cloud_Reply_Tag;
connect_info.key = Cloud_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
@@ -4058,7 +4068,7 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
nlohmann::json detail;
detail["Type"] = 1103; // 设备数据
nlohmann::json msg;
msg["DataType"] = 0x0D; // 内部定值
msg["DataType"] = 2; // 内部定值
// 4) === 将 C# 的拼接逻辑移植为 DataArray ===
// C# 变量对应关系:
@@ -4173,10 +4183,10 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
// 5) 入队发送(保持你的队列逻辑)
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strTopic = Cloud_Reply_Topic;
connect_info.strText = j.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
connect_info.tag = Cloud_Reply_Tag;
connect_info.key = Cloud_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
@@ -5653,6 +5663,75 @@ bool enqueue_direct_download(const std::string& dev_id,
return true;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////文件下载响应处理
void filedownload_cache_put(const std::string& dev_id,
const std::string& local_name,
const std::string& remote_name)
{
std::lock_guard<std::mutex> lk(g_filedownload_cache_mtx);
g_filedownload_cache[dev_id] = FileDownloadReplyInfo{ local_name, remote_name };
}
bool filedownload_cache_take(const std::string& dev_id, FileDownloadReplyInfo& out)
{
std::lock_guard<std::mutex> lk(g_filedownload_cache_mtx);
auto it = g_filedownload_cache.find(dev_id);
if (it == g_filedownload_cache.end()) return false;
out = std::move(it->second);
g_filedownload_cache.erase(it);
return true;
}
bool send_file_download_reply(terminal_dev* dev,
const std::string& local_name,
const std::string& remote_name)
{
if (!dev) {
std::cerr << "[send_file_download_reply] dev=nullptr\n";
return false;
}
// 判断 isbusy==1 且 busytype==READING_FILEDATA
if (dev->isbusy != 1 || dev->busytype != static_cast<int>(DeviceState::READING_FILEDATA)) {
std::cerr << "[send_file_download_reply] device not in READING_FILEDATA state." << std::endl;
return false;
}
// 构造 JSON 报文
nlohmann::json j;
j["guid"] = dev->guid;
j["FrontId"] = FRONT_INST;
j["Node"] = g_front_seg_index;
j["Dev_mac"] = normalize_mac(dev->addr_str);
nlohmann::json detail;
detail["Type"] = 1102;
detail["Msg"] = {
{"Name", local_name},
{"RemoteName", remote_name}
};
detail["Code"] = 200;
j["Detail"] = detail;
std::cout << j.dump(4) << std::endl;
// ---- 入队发送 ----
queue_data_t connect_info;
connect_info.strTopic = Cloud_Reply_Topic;
connect_info.strText = j.dump();
connect_info.tag = Cloud_Reply_Tag;
connect_info.key = Cloud_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
}
std::cout << "[send_file_download_reply] queued: " << j.dump() << std::endl;
return true;
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////目录响应处理
void filemenu_cache_put(const std::string& dev_id,
@@ -5719,10 +5798,10 @@ bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info>& FileList
// ---- 入队发送 ----
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strTopic = Cloud_Reply_Topic;
connect_info.strText = j.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
connect_info.tag = Cloud_Reply_Tag;
connect_info.key = Cloud_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
@@ -5811,10 +5890,10 @@ bool send_running_info(terminal_dev* dev, const RunningInformation& info) {
// ---- 入队发送 ----
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strTopic = Cloud_Reply_Topic;
connect_info.strText = j.dump();
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
connect_info.tag = Cloud_Reply_Tag;
connect_info.key = Cloud_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
@@ -5919,10 +5998,10 @@ bool send_version_info(terminal_dev* dev, const DeviceVersionInfo& info) {
// ---- 入队发送 ----
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strTopic = Cloud_Reply_Topic;
connect_info.strText = j.dump();
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
connect_info.tag = Cloud_Reply_Tag;
connect_info.key = Cloud_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
@@ -6079,12 +6158,12 @@ void on_device_response_minimal(int response_code,
//发送目录
send_file_list(dev,names);
std::cout << "[RESP][FILEMENU->FILEMENU][OK] dev=" << id << std::endl;
} else {
// 失败响应web并复位为空闲
//send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, device_state_int,dev->guid,dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
std::cout << "[RESP][FILEMENU->FILEMENU][WARN] dev=" << id
<< " names missing in cache" << std::endl;
}
@@ -6094,12 +6173,12 @@ void on_device_response_minimal(int response_code,
dev->isbusy = 0;
dev->busytype = 0;
dev->busytimecount = 0;
std::cout << "[RESP][FILEMENU->FILEMENU][OK] dev=" << id << std::endl;
} else {
// 失败响应web并复位为空闲
//send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEMENU),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
send_reply_to_cloud(response_code, id, device_state_int,dev->guid,dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
dev->guid.clear();
dev->isbusy = 0;
@@ -6218,20 +6297,33 @@ void on_device_response_minimal(int response_code,
// ====== 分支 A当前业务就是“读取文件数据” ======
if (ok) {
// 成功:复位
//send_reply_to_cloud(static_cast<int>(ResponseCode::OK), id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"成功,停止该业务处理");
FileDownloadReplyInfo file_info;
if (filedownload_cache_take(id, file_info)) {
send_file_download_reply(dev, file_info.local_name, file_info.remote_name);
std::cout << "[RESP][FILEDATA->FILEDATA][OK][CACHE] dev=" << id
<< " local_name=" << file_info.local_name
<< " remote_name=" << file_info.remote_name
<< std::endl;
} else {//取不到结果,即使上传了文件也当做失败处理
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, device_state_int, dev->guid, dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" +
// get_type_by_state(dev->busytype) + "失败,停止该业务处理");
std::cout << "[RESP][FILEDATA->FILEDATA][FAIL][NO_CACHE] dev=" << id << std::endl;
}
dev->guid.clear();
dev->isbusy = 0;
dev->busytype = 0;
dev->busytimecount = 0;
std::cout << "[RESP][FILEDATA->FILEDATA][OK] dev=" << id << std::endl;
} else {
// 失败响应web并复位
//send_reply_to_cloud(response_code, id, static_cast<int>(DeviceState::READING_FILEDATA),dev->guid,dev->mac);
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
send_reply_to_cloud(response_code, id, device_state_int,dev->guid,dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) +"失败,停止该业务处理");
dev->guid.clear();
dev->isbusy = 0;
@@ -6365,10 +6457,13 @@ void on_device_response_minimal(int response_code,
// 发送运行信息
send_running_info(dev, info);
std::cout << "[RESP][RUNNINGINFO][OK] dev=" << id << std::endl;
} else {
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" +
get_type_by_state(dev->busytype) + "失败,运行状态缓存不存在");
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, device_state_int, dev->guid, dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" +
// get_type_by_state(dev->busytype) + "失败,运行状态缓存不存在");
std::cout << "[RESP][RUNNINGINFO][WARN] dev=" << id
<< " running info missing in cache" << std::endl;
@@ -6379,11 +6474,12 @@ void on_device_response_minimal(int response_code,
dev->busytype = 0;
dev->busytimecount = 0;
std::cout << "[RESP][RUNNINGINFO][OK] dev=" << id << std::endl;
} else {
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" +
get_type_by_state(dev->busytype) + "失败,停止该业务处理");
send_reply_to_cloud(response_code, id, device_state_int,dev->guid,dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" +
// get_type_by_state(dev->busytype) + "失败,停止该业务处理");
dev->guid.clear();
dev->isbusy = 0;
@@ -6426,10 +6522,13 @@ void on_device_response_minimal(int response_code,
// 发送版本信息
send_version_info(dev, info);
std::cout << "[RESP][VERSIONINFO][OK] dev=" << id << std::endl;
} else {
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" +
get_type_by_state(dev->busytype) + "失败,版本信息缓存不存在");
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, device_state_int, dev->guid, dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" +
// get_type_by_state(dev->busytype) + "失败,版本信息缓存不存在");
std::cout << "[RESP][VERSIONINFO][WARN] dev=" << id
<< " version info missing in cache" << std::endl;
@@ -6440,11 +6539,12 @@ void on_device_response_minimal(int response_code,
dev->busytype = 0;
dev->busytimecount = 0;
std::cout << "[RESP][VERSIONINFO][OK] dev=" << id << std::endl;
} else {
send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + dev->terminal_id + "进行业务:" +
get_type_by_state(dev->busytype) + "失败,停止该业务处理");
send_reply_to_cloud(response_code, id, device_state_int,dev->guid,dev->mac);
//send_reply_to_queue(dev->guid, static_cast<int>(ResponseCode::BAD_REQUEST),
// "终端 id: " + dev->terminal_id + "进行业务:" +
// get_type_by_state(dev->busytype) + "失败,停止该业务处理");
dev->guid.clear();
dev->isbusy = 0;
@@ -6500,30 +6600,36 @@ void on_device_response_minimal(int response_code,
// 下发升级指令
ClientManager::instance().send_upgrade_action_to_device(id, file_data, 10240);
//正在处理
send_reply_to_cloud(static_cast<int>(ResponseCode::PROCESSING), id, device_state_int, dev->guid, dev->mac);
dev->isbusy = 1; // 完成了预校验但是仍处于忙碌,因为还要升级
}else if(dev->isbusy == 1){
std::cout << "[SET_PREUPGRADE] already upgrade OK, terminal_id="
std::cout << "[SET_PREUPGRADE] already upgrade prepare finish, terminal_id="
<< id << std::endl;
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理");
}else if(dev->isbusy == 1){
//升级成功
send_reply_to_cloud(static_cast<int>(ResponseCode::OK), id, device_state_int, dev->guid, dev->mac);
//send_reply_to_queue(dev->guid, response_code,
// "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理");
//成功结束业务
dev->guid.clear(); // 清空 guid
dev->busytype = 0; // 业务类型归零
dev->isbusy = 0; // 清空业务标志
dev->busytimecount = 0; // 计时归零
std::cout << "[clear_terminal_runtime_state] Cleared runtime state for terminal_id="
<< id << std::endl;
std::cout << "[SET_PREUPGRADE] already upgrade OK, terminal_id="
<< id << std::endl;
}
else {
std::cout << "[SET_PREUPGRADE] status error" <<std::endl;
// 失败 → 直接结束业务
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id +
"进行业务:" + get_type_by_state(dev->busytype) +
",处理逻辑错误,停止该业务处理");
send_reply_to_cloud(static_cast<int>(ResponseCode::BAD_REQUEST), id, device_state_int, dev->guid, dev->mac);
//send_reply_to_queue(dev->guid, response_code,
// "终端 id: " + dev->terminal_id +
// "进行业务:" + get_type_by_state(dev->busytype) +
// ",处理逻辑错误,停止该业务处理");
dev->guid.clear();
dev->busytype = 0;
dev->isbusy = 0;
@@ -6535,11 +6641,12 @@ void on_device_response_minimal(int response_code,
std::cout << "[SET_PREUPGRADE] read/send failed: "
<< e.what() << std::endl;
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id +
"进行业务:" + get_type_by_state(dev->busytype) +
"," + ResponseCodeToString(response_code) + "停止该业务处理");
//未知原因失败,直接结束业务
send_reply_to_cloud(static_cast<int>(ResponseCode::FORBIDDEN), id, device_state_int, dev->guid, dev->mac);
//send_reply_to_queue(dev->guid, response_code,
// "终端 id: " + dev->terminal_id +
// "进行业务:" + get_type_by_state(dev->busytype) +
// "," + ResponseCodeToString(response_code) + "停止该业务处理");
// 失败也要清状态
dev->guid.clear();
@@ -6554,10 +6661,10 @@ void on_device_response_minimal(int response_code,
<< id << std::endl;
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id +
"进行业务:" + get_type_by_state(dev->busytype) +
"," + ResponseCodeToString(response_code) + "停止该业务处理");
//send_reply_to_queue(dev->guid, response_code,
// "终端 id: " + dev->terminal_id +
// "进行业务:" + get_type_by_state(dev->busytype) +
// "," + ResponseCodeToString(response_code) + "停止该业务处理");
dev->guid.clear();
dev->busytype = 0;
@@ -7276,39 +7383,40 @@ bool SendFileWebAuto(const std::string& id,
if (dev_ptr) {
const int bt = dev_ptr->busytype;
// 若处于“事件文件/统计文件”补招阶段则使用补招专用上传目录comtrade/wave/...
// 若处于补招阶段,不在这边上传
if (bt == static_cast<int>(DeviceState::READING_EVENTFILE) ||
bt == static_cast<int>(DeviceState::READING_STATSFILE)) {
std::string rel = dirname_with_slash(local_path); // 例如download/00:B7:.../
// 将 download/ 前缀替换为 wave/
if (!replace_prefix(rel, "download/", "wave/")) {
// 若不是以 download/ 开头,兜底拼 wave/ + 原目录
rel = "wave/" + rel;
}
file_cloudpath = "comtrade/" + rel; // 目标comtrade/wave/00:B7:.../
std::cout << "[SendFileWebAuto] dev=" << id
<< " busytype=" << bt
<< " -> use recall upload URL (cloud path=" << file_cloudpath << ")\n";
} else {
<< " -> skip upload (recall file)\n";
return true; // 认为成功,外层业务继续处理(但不上传)
} else if(bt == static_cast<int>(DeviceState::READING_FILEDATA)) {
// 非补招场景沿用原来的 download 目录
file_cloudpath = dirname_with_slash(local_path); // 保持原逻辑
file_cloudpath = dirname_with_slash(local_path);
std::cout << "[SendFileWebAuto] dev=" << id
<< " busytype=" << bt
<< " -> use default upload URL (cloud path=" << file_cloudpath << ")\n";
// 实际上传调用
SendFileWeb(WEB_FILEUPLOAD, local_path, file_cloudpath, out_filename,2);
std::cout << "[SendFileWebAuto] File upload complete: " << out_filename << std::endl;
//记录下载的文件名到列表中,后续发送响应
filedownload_cache_put(id,
sanitize(local_path),
sanitize(out_filename));
}
else {
std::cout << "[SendFileWebAuto][WARN] dev=" << id
<< " busytype=" << bt
<< " -> no upload (unsupported state)\n";
}
} else {
std::cout << "[SendFileWebAuto][WARN] device not found for id=" << id
<< ", fallback to default URL\n";
file_cloudpath = dirname_with_slash(local_path);
}
// 实际上传调用
SendFileWeb(WEB_FILEUPLOAD, local_path, file_cloudpath, out_filename);
std::cout << "[SendFileWebAuto] File upload complete: " << out_filename << std::endl;
return true;
} catch (const std::exception& e) {