fix cloudtopic msg proc

This commit is contained in:
lnk
2026-03-30 15:32:14 +08:00
parent 69accad937
commit 15cbbd1c24
5 changed files with 213 additions and 20 deletions

View File

@@ -68,7 +68,7 @@ extern std::map<std::string, Xmldata*> xmlinfo_list2;//保存所有型号角形
//////////////////////////////////////////////////////////////////////////////////////////////////
extern time_t ConvertToTimestamp(const tagTime& time);
extern std::vector<unsigned char> read_file_as_bytes(const std::string& file_path);
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -173,6 +173,10 @@ std::string Topic_Reply_Topic = "";
std::string Topic_Reply_Tag = "";
std::string Topic_Reply_Key = "";
std::string Cloud_Reply_Topic = "";
std::string Cloud_Reply_Tag = "";
std::string Cloud_Reply_Key = "";
//消费者
std::string G_ROCKETMQ_CONSUMER = "";//rocketmq consumer
std::string G_MQCONSUMER_IPPORT = "";//consumer ip+port
@@ -381,6 +385,10 @@ void loadConfig(const std::string& filename) {
strMap["RocketMq.ConsumerTagCLOUD"] = &G_MQCONSUMER_TAG_CLOUD;
strMap["RocketMq.ConsumerKeyCLOUD"] = &G_MQCONSUMER_KEY_CLOUD;
strMap["RocketMq.Cloud_Reply_Topic"] = &Cloud_Reply_Topic;
strMap["RocketMq.Cloud_Reply_Tag"] = &Cloud_Reply_Tag;
strMap["RocketMq.Cloud_Reply_Key"] = &Cloud_Reply_Key;
strMap["RocketMq.Topic_Test"] = &G_ROCKETMQ_TOPIC_TEST;
strMap["RocketMq.Tag_Test"] = &G_ROCKETMQ_TAG_TEST;
strMap["RocketMq.Key_Test"] = &G_ROCKETMQ_KEY_TEST;
@@ -3924,7 +3932,7 @@ bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, con
// Detail
nlohmann::json detail;
detail["Type"] = 0x2106; // 设备数据
detail["Type"] = 1103; // 设备数据
// Msg
nlohmann::json msg;
@@ -4048,7 +4056,7 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
j["Dev_mac"] = normalize_mac(dev.addr_str);
nlohmann::json detail;
detail["Type"] = 0x2106; // 设备数据
detail["Type"] = 1103; // 设备数据
nlohmann::json msg;
msg["DataType"] = 0x0D; // 内部定值
@@ -5699,7 +5707,7 @@ bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info>& FileList
// 构造 Detail 部分
nlohmann::json detail;
detail["Type"] = 0x2131; // 读取目录
detail["Type"] = 1101; // 读取目录
detail["Msg"] = { {"DirInfo", dirArray} };
detail["Code"] = 200; // 请求成功
@@ -6450,6 +6458,116 @@ void on_device_response_minimal(int response_code,
break;
}
// ================= 新增:预升级处理 =================
case DeviceState::SET_PREUPGRADE: {
std::lock_guard<std::mutex> lk(ledgermtx);
terminal_dev* dev = nullptr;
for (auto& d : terminal_devlist) {
if (d.terminal_id == id) {
dev = &d;
break;
}
}
if (!dev) {
std::cout << "[SET_PREUPGRADE] dev not found, terminal_id="
<< id << " rc=" << response_code << std::endl;
break;
}
const int bt = dev->busytype;
// 只有当前业务类型确实还是预升级,才继续处理
if (bt != static_cast<int>(DeviceState::SET_PREUPGRADE)) {
std::cout << "[SET_PREUPGRADE] busytype mismatch, terminal_id=" << id
<< " dev->busytype=" << bt
<< " expect=" << static_cast<int>(DeviceState::SET_PREUPGRADE)
<< std::endl;
break;
}
// 返回 OK执行升级
if (ok) {
std::cout << "[SET_PREUPGRADE] OK, start upgrade, terminal_id="
<< id << std::endl;
try {
if(dev->isbusy == 2){
// 读取升级文件
std::vector<unsigned char> file_data = read_file_as_bytes("pqs_arm2.bin");
// 下发升级指令
ClientManager::instance().send_upgrade_action_to_device(id, file_data, 10240);
dev->isbusy = 1; // 完成了预校验但是仍处于忙碌,因为还要升级
}else if(dev->isbusy == 1){
std::cout << "[SET_PREUPGRADE] already upgrade OK, terminal_id="
<< id << std::endl;
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理");
//成功结束业务
dev->guid.clear(); // 清空 guid
dev->busytype = 0; // 业务类型归零
dev->isbusy = 0; // 清空业务标志
dev->busytimecount = 0; // 计时归零
std::cout << "[clear_terminal_runtime_state] Cleared runtime state for terminal_id="
<< id << std::endl;
}
else {
std::cout << "[SET_PREUPGRADE] status error" <<std::endl;
// 失败 → 直接结束业务
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id +
"进行业务:" + get_type_by_state(dev->busytype) +
",处理逻辑错误,停止该业务处理");
dev->guid.clear();
dev->busytype = 0;
dev->isbusy = 0;
dev->busytimecount = 0;
}
} catch (const std::exception& e) {
std::cout << "[SET_PREUPGRADE] read/send failed: "
<< e.what() << std::endl;
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id +
"进行业务:" + get_type_by_state(dev->busytype) +
"," + ResponseCodeToString(response_code) + "停止该业务处理");
// 失败也要清状态
dev->guid.clear();
dev->busytype = 0;
dev->isbusy = 0;
dev->busytimecount = 0;
}
} else {
// 失败 → 直接结束业务
std::cout << "[SET_PREUPGRADE] FAIL, stop business, terminal_id="
<< id << std::endl;
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id +
"进行业务:" + get_type_by_state(dev->busytype) +
"," + ResponseCodeToString(response_code) + "停止该业务处理");
dev->guid.clear();
dev->busytype = 0;
dev->isbusy = 0;
dev->busytimecount = 0;
}
break;
}
// ================= 其它状态统一处理 =================
default: {
@@ -6460,7 +6578,7 @@ void on_device_response_minimal(int response_code,
}
if (dev) {
//直接根据输入响应mq
//send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac);
send_reply_to_queue(dev->guid, response_code,
"终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理");
//其他的错误和成功都会结束业务