From 15cbbd1c241ba626324dcf3d5c7c584d735e8fcf Mon Sep 17 00:00:00 2001 From: lnk Date: Mon, 30 Mar 2026 15:32:14 +0800 Subject: [PATCH] fix cloudtopic msg proc --- LFtid1056/cloudfront/code/cfg_parser.cpp | 128 ++++++++++++++++++++++- LFtid1056/cloudfront/code/main.cpp | 2 + LFtid1056/cloudfront/code/rocketmq.cpp | 68 +++++++++--- LFtid1056/cloudfront/code/rocketmq.h | 5 + LFtid1056/dealMsg.cpp | 30 +++++- 5 files changed, 213 insertions(+), 20 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 4e14265..890ed82 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -68,7 +68,7 @@ extern std::map xmlinfo_list2;//保存所有型号角形 ////////////////////////////////////////////////////////////////////////////////////////////////// extern time_t ConvertToTimestamp(const tagTime& time); - +extern std::vector read_file_as_bytes(const std::string& file_path); ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -173,6 +173,10 @@ std::string Topic_Reply_Topic = ""; std::string Topic_Reply_Tag = ""; std::string Topic_Reply_Key = ""; +std::string Cloud_Reply_Topic = ""; +std::string Cloud_Reply_Tag = ""; +std::string Cloud_Reply_Key = ""; + //消费者 std::string G_ROCKETMQ_CONSUMER = "";//rocketmq consumer std::string G_MQCONSUMER_IPPORT = "";//consumer ip+port @@ -381,6 +385,10 @@ void loadConfig(const std::string& filename) { strMap["RocketMq.ConsumerTagCLOUD"] = &G_MQCONSUMER_TAG_CLOUD; strMap["RocketMq.ConsumerKeyCLOUD"] = &G_MQCONSUMER_KEY_CLOUD; + strMap["RocketMq.Cloud_Reply_Topic"] = &Cloud_Reply_Topic; + strMap["RocketMq.Cloud_Reply_Tag"] = &Cloud_Reply_Tag; + strMap["RocketMq.Cloud_Reply_Key"] = &Cloud_Reply_Key; + strMap["RocketMq.Topic_Test"] = &G_ROCKETMQ_TOPIC_TEST; strMap["RocketMq.Tag_Test"] = &G_ROCKETMQ_TAG_TEST; strMap["RocketMq.Key_Test"] = &G_ROCKETMQ_KEY_TEST; @@ -3924,7 +3932,7 @@ bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, con // Detail nlohmann::json detail; - detail["Type"] = 0x2106; // 设备数据 + detail["Type"] = 1103; // 设备数据 // Msg nlohmann::json msg; @@ -4048,7 +4056,7 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector& FileList // 构造 Detail 部分 nlohmann::json detail; - detail["Type"] = 0x2131; // 读取目录 + detail["Type"] = 1101; // 读取目录 detail["Msg"] = { {"DirInfo", dirArray} }; detail["Code"] = 200; // 请求成功 @@ -6450,6 +6458,116 @@ void on_device_response_minimal(int response_code, break; } + // ================= 新增:预升级处理 ================= + case DeviceState::SET_PREUPGRADE: { + std::lock_guard lk(ledgermtx); + + terminal_dev* dev = nullptr; + for (auto& d : terminal_devlist) { + if (d.terminal_id == id) { + dev = &d; + break; + } + } + + if (!dev) { + std::cout << "[SET_PREUPGRADE] dev not found, terminal_id=" + << id << " rc=" << response_code << std::endl; + break; + } + + const int bt = dev->busytype; + + // 只有当前业务类型确实还是预升级,才继续处理 + if (bt != static_cast(DeviceState::SET_PREUPGRADE)) { + std::cout << "[SET_PREUPGRADE] busytype mismatch, terminal_id=" << id + << " dev->busytype=" << bt + << " expect=" << static_cast(DeviceState::SET_PREUPGRADE) + << std::endl; + break; + } + + // 返回 OK,执行升级 + if (ok) { + std::cout << "[SET_PREUPGRADE] OK, start upgrade, terminal_id=" + << id << std::endl; + + try { + if(dev->isbusy == 2){ + // 读取升级文件 + std::vector file_data = read_file_as_bytes("pqs_arm2.bin"); + + // 下发升级指令 + ClientManager::instance().send_upgrade_action_to_device(id, file_data, 10240); + + dev->isbusy = 1; // 完成了预校验但是仍处于忙碌,因为还要升级 + }else if(dev->isbusy == 1){ + std::cout << "[SET_PREUPGRADE] already upgrade OK, terminal_id=" + << id << std::endl; + send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); + send_reply_to_queue(dev->guid, response_code, + "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理"); + //成功结束业务 + dev->guid.clear(); // 清空 guid + dev->busytype = 0; // 业务类型归零 + dev->isbusy = 0; // 清空业务标志 + dev->busytimecount = 0; // 计时归零 + std::cout << "[clear_terminal_runtime_state] Cleared runtime state for terminal_id=" + << id << std::endl; + + } + else { + std::cout << "[SET_PREUPGRADE] status error" <guid, dev->mac); + send_reply_to_queue(dev->guid, response_code, + "终端 id: " + dev->terminal_id + + "进行业务:" + get_type_by_state(dev->busytype) + + ",处理逻辑错误,停止该业务处理"); + dev->guid.clear(); + dev->busytype = 0; + dev->isbusy = 0; + dev->busytimecount = 0; + } + + + } catch (const std::exception& e) { + std::cout << "[SET_PREUPGRADE] read/send failed: " + << e.what() << std::endl; + + send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); + send_reply_to_queue(dev->guid, response_code, + "终端 id: " + dev->terminal_id + + "进行业务:" + get_type_by_state(dev->busytype) + + "," + ResponseCodeToString(response_code) + "停止该业务处理"); + + // 失败也要清状态 + dev->guid.clear(); + dev->busytype = 0; + dev->isbusy = 0; + dev->busytimecount = 0; + } + + } else { + // 失败 → 直接结束业务 + std::cout << "[SET_PREUPGRADE] FAIL, stop business, terminal_id=" + << id << std::endl; + + send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); + send_reply_to_queue(dev->guid, response_code, + "终端 id: " + dev->terminal_id + + "进行业务:" + get_type_by_state(dev->busytype) + + "," + ResponseCodeToString(response_code) + "停止该业务处理"); + + dev->guid.clear(); + dev->busytype = 0; + dev->isbusy = 0; + dev->busytimecount = 0; + } + + break; + } + // ================= 其它状态统一处理 ================= default: { @@ -6460,7 +6578,7 @@ void on_device_response_minimal(int response_code, } if (dev) { //直接根据输入响应mq - //send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); + send_reply_to_cloud(response_code, id, device_state_int, dev->guid, dev->mac); send_reply_to_queue(dev->guid, response_code, "终端 id: " + dev->terminal_id + "进行业务:" + get_type_by_state(dev->busytype) + "," + ResponseCodeToString(response_code) + "停止该业务处理"); //其他的错误和成功都会结束业务 diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index 389bb58..4976961 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -565,6 +565,8 @@ void Front::mqconsumerThread() subscriptions.emplace_back(G_MQCONSUMER_TOPIC_SET, FRONT_INST, myMessageCallbackset); subscriptions.emplace_back(G_MQCONSUMER_TOPIC_LOG, FRONT_INST, myMessageCallbacklog); + subscriptions.emplace_back(G_MQCONSUMER_TOPIC_CLOUD, FRONT_INST, cloudMessageCallback); + m_mqConsumer = make_unique(consumerGroup); diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 9bde627..60c09e8 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -2071,11 +2071,37 @@ bool parseJsonMessageCLOUD(const std::string &body, std::string &devid, std::string &guid, nlohmann::json &detailObj, // 这里返回整个 Detail - std::string &front_ip, // 新增:返回 FrontIP + std::string &front_id, // 新增:返回 FrontId int &node) // 新增:返回 Node { try { - auto j = nlohmann::json::parse(body); + // ====== 先解析外层 JSON ====== + auto outer = nlohmann::json::parse(body); + + // ====== 提取 messageBody 字符串 ====== + if (!outer.contains("messageBody") || !outer["messageBody"].is_string()) { + std::cerr << "[parseJsonMessageCLOUD] 'messageBody' is missing or is not a string\n"; + guid.clear(); + devid.clear(); + front_ip.clear(); + node = 0; + detailObj = nlohmann::json::object(); + return false; + } + + std::string messageBodyStr = outer["messageBody"].get(); + if (messageBodyStr.empty()) { + std::cerr << "[parseJsonMessageCLOUD] 'messageBody' is empty\n"; + guid.clear(); + devid.clear(); + front_ip.clear(); + node = 0; + detailObj = nlohmann::json::object(); + return false; + } + + // ====== 再解析 messageBody 内层 JSON ====== + auto j = nlohmann::json::parse(messageBodyStr); // guid if (j.contains("guid") && j["guid"].is_string()) { @@ -2084,9 +2110,9 @@ bool parseJsonMessageCLOUD(const std::string &body, guid.clear(); } - // FrontIP - if (j.contains("FrontIP") && j["FrontIP"].is_string()) { - front_ip = j["FrontIP"].get(); + // FrontId + if (j.contains("FrontId") && j["FrontId"].is_string()) { + front_ip = j["FrontId"].get(); } else { front_ip.clear(); } @@ -2171,7 +2197,7 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann: if (detailObj.contains("Type")) { if (detailObj["Type"].is_string()) { try { - parsed.type = std::stoi(detailObj["Type"].get(), nullptr, 0); // 支持 "0x2106" 格式 + parsed.type = std::stoi(detailObj["Type"].get(), nullptr, 0); // 支持 "1103" 格式 } catch (...) { return false; } @@ -2195,7 +2221,7 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann: try { switch (parsed.type) { - case 0x2131: { // 读取目录 + case 1101: { // 读取目录 if(!recordguid(devid,guid,static_cast(DeviceState::READING_FILEMENU),1)){ return true; @@ -2212,7 +2238,7 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann: return true; } - case 0x2132: { // 下载文件 + case 1102: { // 下载文件 if(!recordguid(devid,guid,static_cast(DeviceState::READING_FILEDATA),1)){ return true; @@ -2229,7 +2255,7 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann: return true; } - case 0x2106: { // 定值/内部定值 + case 1103: { // 定值/内部定值 if (!msgObj.contains("Cldid") || !msgObj["Cldid"].is_number_integer()) return false; if (!msgObj.contains("DataType") || !msgObj["DataType"].is_number_integer()) return false; if (!msgObj.contains("Operate") || !msgObj["Operate"].is_number_integer()) return false; @@ -2403,11 +2429,25 @@ bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann: std::cout << "[parsemsg] reboot device, devid=" << devid << ", guid=" << guid << std::endl; - if (!recordguid(devid, guid, static_cast(DeviceState::SET_CTRL), 2)) {//分两步,一步校验一步重启 + if (!recordguid(devid, guid, static_cast(DeviceState::SET_CTRL), 1)) { return true; } - ClientManager::instance().set_ctrl_action_to_device(devid,0x01,0x00);//尝试装置重启指令!第一步校验 + ClientManager::instance().set_ctrl_action_to_device(devid,0x01,0x00);//尝试装置重启指令! + return true; + } + + case 1115: { // 升级 + parsed.ok = true; + + std::cout << "[parsemsg] upgrade device, devid=" << devid + << ", guid=" << guid << std::endl; + + if (!recordguid(devid, guid, static_cast(DeviceState::SET_PREUPGRADE), 2)) { + return true; + } + + ClientManager::instance().set_preupgrade_action_to_device(devid, "");//尝试装置升级指令!第一步校验 return true; } @@ -2461,10 +2501,10 @@ void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type, co // ---- 入队发送 ---- queue_data_t connect_info; - connect_info.strTopic = Topic_Reply_Topic; + connect_info.strTopic = Cloud_Reply_Topic; connect_info.strText = obj.dump(); // 序列化为字符串 - connect_info.tag = Topic_Reply_Tag; - connect_info.key = Topic_Reply_Key; + connect_info.tag = Cloud_Reply_Tag; + connect_info.key = Cloud_Reply_Key; { std::lock_guard lock(queue_data_list_mutex); diff --git a/LFtid1056/cloudfront/code/rocketmq.h b/LFtid1056/cloudfront/code/rocketmq.h index 681b2a6..62d3561 100644 --- a/LFtid1056/cloudfront/code/rocketmq.h +++ b/LFtid1056/cloudfront/code/rocketmq.h @@ -94,6 +94,10 @@ extern std::string G_MQCONSUMER_TOPIC_CLOUD; extern std::string G_MQCONSUMER_TAG_CLOUD; extern std::string G_MQCONSUMER_KEY_CLOUD; +extern std::string Cloud_Reply_Topic; +extern std::string Cloud_Reply_Tag; +extern std::string Cloud_Reply_Key; + extern std::string G_LOG_TOPIC; extern std::string G_LOG_TAG; extern std::string G_LOG_KEY; @@ -336,6 +340,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg); rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg); rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg); +rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg); void send_heartbeat_to_queue(const std::string& status); diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index dade921..b165e4e 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -2073,6 +2073,9 @@ void process_received_message(string mac, string id,const char* data, size_t len //设置装置对时(主动对时) if (udata[8] == static_cast(MsgResponseType::Response_NewACK)) { std::cout << "set success" << mac << std::endl; + + on_device_response_minimal(static_cast(ResponseCode::OK), id, 0, static_cast(DeviceState::SET_RIGHTTIME_2)); + //对时设置成功,调整为空闲,处理后续工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } @@ -2082,11 +2085,13 @@ void process_received_message(string mac, string id,const char* data, size_t len std::cout << "reason code: " << static_cast(udata[8]) << "-" << static_cast(udata[9]) << "-" << static_cast(udata[10]) << "-" << static_cast(udata[11]) << std::endl; // 装置否定应答,对时设置失败 + on_device_response_minimal(static_cast(ResponseCode::BAD_REQUEST), id, 0, static_cast(DeviceState::SET_RIGHTTIME_2)); // 设置对时失败,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else { // 装置答非所问异常 + on_device_response_minimal(static_cast(ResponseCode::INTERNAL_ERROR), id, 0, static_cast(DeviceState::SET_RIGHTTIME_2)); // 设置对时失败,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } @@ -2347,6 +2352,7 @@ void process_received_message(string mac, string id,const char* data, size_t len } else { std::cout << "***ctrl fail" << mac << std::endl; + on_device_response_minimal(static_cast(ResponseCode::UNAUTHORIZED), id, 0, static_cast(DeviceState::SET_CTRL)); //控制命令校验不合法,调整为空闲,处理后续工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } @@ -2354,15 +2360,19 @@ void process_received_message(string mac, string id,const char* data, size_t len else if (udata[8] == static_cast(MsgResponseType::Response_NewACK)) { std::cout << "***ctrl success" << mac << std::endl; //控制命令执行完毕,调整为空闲,处理后续工作。 + on_device_response_minimal(static_cast(ResponseCode::OK), id, 0, static_cast(DeviceState::SET_CTRL)); + ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else if (udata[8] == static_cast(MsgResponseType::Response_NewNACK)) { std::cout << "***ctrl fail" << mac << std::endl; //控制命令执行失败,调整为空闲,处理后续工作。 + on_device_response_minimal(static_cast(ResponseCode::BAD_REQUEST), id, 0, static_cast(DeviceState::SET_CTRL)); ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else { // 装置答非所问异常 + on_device_response_minimal(static_cast(ResponseCode::INTERNAL_ERROR), id, 0, static_cast(DeviceState::SET_CTRL)); // 控制命令失败,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } @@ -2445,15 +2455,19 @@ void process_received_message(string mac, string id,const char* data, size_t len if (generatedCrc != crc) { //crc 校验失败 后续升级停止! std::cerr << "CRC verify failed." << std::endl; + on_device_response_minimal(static_cast(ResponseCode::UNAUTHORIZED), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); ClientManager::instance().change_device_state(id, DeviceState::IDLE); break; } + on_device_response_minimal(static_cast(ResponseCode::OK), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); + // 预升级校验结束成功,调整为空闲,处理后续工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else { // 装置答非所问异常 + on_device_response_minimal(static_cast(ResponseCode::INTERNAL_ERROR), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); // 预升级校验失败,调整为空闲状态,处理下一项工作。 ClientManager::instance().change_device_state(id, DeviceState::IDLE); } @@ -2474,23 +2488,33 @@ void process_received_message(string mac, string id,const char* data, size_t len //接收文件错误! std::cout << "*** upgrade 0x02 fail ***!" << mac << std::endl; // 升级流程失败,调整为空闲状态,处理下一项工作。 + + on_device_response_minimal(static_cast(ResponseCode::NOT_FOUND), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); + ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else if (udata[9] == 0x55) { //升级流程成功,等候装置重启 std::cout << "*** upgrade 0x55 success ***!" << mac << std::endl; - // 升级流程失败,调整为空闲状态,处理下一项工作。 + // 升级流程成功,调整为空闲状态,处理下一项工作。 + + on_device_response_minimal(static_cast(ResponseCode::OK), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); + ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else if (udata[9] == 0xAA) { // 升级流程失败! std::cout << "*** upgrade 0xAA fail ***!" << mac << std::endl; // 升级流程失败,调整为空闲状态,处理下一项工作。 + + on_device_response_minimal(static_cast(ResponseCode::BAD_REQUEST), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); + ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else { std::cout << "*** upgrade ?? error ***!" << mac << std::endl; // 升级流程失败,调整为空闲状态,处理下一项工作。 + on_device_response_minimal(static_cast(ResponseCode::INTERNAL_ERROR), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); ClientManager::instance().change_device_state(id, DeviceState::IDLE); } } @@ -2511,6 +2535,7 @@ void process_received_message(string mac, string id,const char* data, size_t len if (!ok) { //组装后续升级报文时出现异常,无法发送后续帧文件 std::cout << "获取下一帧失败\n"; + on_device_response_minimal(static_cast(ResponseCode::NOT_FOUND), id, 0, static_cast(DeviceState::SET_UPGRADE)); ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else if (!packet.empty()) { @@ -2532,18 +2557,21 @@ void process_received_message(string mac, string id,const char* data, size_t len else { // 理论上不该走到这里,防御处理 std::cout << "未获取到有效升级报文\n"; + on_device_response_minimal(static_cast(ResponseCode::INTERNAL_ERROR), id, 0, static_cast(DeviceState::SET_UPGRADE)); ClientManager::instance().change_device_state(id, DeviceState::IDLE); } } else if (udata[8] == static_cast(MsgResponseType::Response_NewNACK)) { std::cout << "*** upgrade 0x41 fail ***!" << mac << std::endl; // 升级流程失败,调整为空闲状态,处理下一项工作。 + on_device_response_minimal(static_cast(ResponseCode::BAD_REQUEST), id, 0, static_cast(DeviceState::SET_PREUPGRADE)); ClientManager::instance().change_device_state(id, DeviceState::IDLE); } else { // 装置答非所问异常 std::cout << "*** upgrade ?? fail ***!" << mac << std::endl; // 升级流程失败,调整为空闲状态,处理下一项工作。 + on_device_response_minimal(static_cast(ResponseCode::INTERNAL_ERROR), id, 0, static_cast(DeviceState::SET_UPGRADE)); ClientManager::instance().change_device_state(id, DeviceState::IDLE); } break;