diff --git a/LFtid1056.rar b/LFtid1056.rar new file mode 100644 index 0000000..1c18161 Binary files /dev/null and b/LFtid1056.rar differ diff --git a/LFtid1056/PQSMsg.cpp b/LFtid1056/PQSMsg.cpp index fb59f1a..823a93d 100644 --- a/LFtid1056/PQSMsg.cpp +++ b/LFtid1056/PQSMsg.cpp @@ -352,7 +352,7 @@ std::vector generate_frontlogin_message(const std::string& strMac //GetMAC(strMac, packet, 20); //lnk20250808 std::string err; - if (!GetMAC(strMac, packet, 0, &err)) { + if (!GetMAC(strMac, packet, 20, &err)) { std::cerr << "[GetMAC] parse failed: " << err << "\n"; // 򷵻 } diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 1695a99..11ba5fa 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -3082,15 +3082,451 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std:: } ////////////////////////////////////////////////////////////////////////////////////////提取mac -std::string normalize_mac(const std::string& mac) { - std::string result = mac; - result.erase(std::remove(result.begin(), result.end(), '-'), result.end()); - return result; +std::string normalize_mac(const std::string &mac) { + std::string res; + res.reserve(mac.size()); + for (char c : mac) { + if (c != '-' && c != ':' && c != ' ') + res.push_back(c); + } + return res; } -//////////////////////////////////////////////////////////////////////////////////////// +//找到dev的mac +std::string get_mac_by_devid(const std::string &devid) { + std::lock_guard lock(ledgermtx); + for (auto &dev : terminal_devlist) { + if (dev.terminal_id == devid) { + return normalize_mac(dev.addr_str); // 规范化后返回 + } + } + return {}; // 没找到返回空串 +} +////////////////////////////////////////////////////////////////////////////////////////目录信息发送接口函数 +bool send_file_list(const std::string &dev_id, const std::vector &FileList) { + // 找到对应 terminal_dev + std::lock_guard lock(ledgermtx); + auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), + [&](const terminal_dev &dev) { return dev.terminal_id == dev_id; }); + if (it == terminal_devlist.end()) { + std::cerr << "[send_file_list] device not found: " << dev_id << std::endl; + return false; + } + terminal_dev &dev = *it; + // 判断 isbusy==1 且 busytype==READING_FILEMENU + if (dev.isbusy != 1 || dev.busytype != static_cast(DeviceState::READING_FILEMENU)) { + std::cerr << "[send_file_list] device not in READING_FILEMENU state." << std::endl; + return false; + } + // 构造 JSON 报文 + nlohmann::json j; + j["guid"] = dev.guid; + j["FrontIP"] = FRONT_IP; // 这里填你的前置机 IP + j["Node"] = g_front_seg_index; // 节点号 + j["Dev_mac"] = normalize_mac(dev.addr_str); // addr_str 存的是 MAC + + // 构造 DirInfo 数组 + nlohmann::json dirArray = nlohmann::json::array(); + for (const auto &f : FileList) { + nlohmann::json item; + item["Name"] = f.name; + item["Type"] = (f.flag == 0) ? "dir" : "file"; + item["Size"] = f.size; + dirArray.push_back(item); + } + + // 构造 Detail 部分 + nlohmann::json detail; + detail["Type"] = 0x2131; // 读取目录 + detail["Msg"] = { {"DirInfo", dirArray} }; + detail["Code"] = 200; // 请求成功 + + // 放到顶层 + j["Detail"] = detail; + + // 打印调试 + std::cout << j.dump(4) << std::endl; + + // ---- 入队发送 ---- + queue_data_t connect_info; + connect_info.strTopic = Topic_Reply_Topic; + connect_info.strText = j.dump(); // 序列化为字符串 + { + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(std::move(connect_info)); + } + // 调试打印 + std::cout << "[send_reply_to_cloud] queued: " << j.dump() << std::endl; + + //发送后清除guid和标志 + if (dev.isbusy > 0) { + dev.isbusy--; + } + if(dev.isbusy == 0){ + dev.guid.clear(); + dev.busytype = 0; + } + + return true; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////检查云前置终端的mq业务超时 +int get_type_by_state(int state) { + switch (static_cast(state)) { + case DeviceState::READING_STATS: + case DeviceState::READING_STATS_TIME: + case DeviceState::READING_REALSTAT: + case DeviceState::READING_FIXEDVALUE: + case DeviceState::READING_FIXEDVALUEDES: + case DeviceState::SET_FIXEDVALUE: + case DeviceState::READING_INTERFIXEDVALUE: + case DeviceState::READING_INTERFIXEDVALUEDES: + case DeviceState::READING_CONTROLWORD: + case DeviceState::SET_INTERFIXEDVALUE: + return 0x2106; + + case DeviceState::READING_FILEMENU: + return 0x2131; + + case DeviceState::READING_EVENTFILE: + case DeviceState::READING_FILEDATA: + return 0x2132; + + default: + return 0; // 没有对应的type + } +} + +// 定时检查业务超时 +void check_device_busy_timeout() +{ + std::lock_guard lock(ledgermtx); + for (auto &dev : terminal_devlist) + { + if (dev.isbusy != 0) // 有业务在进行 + { + dev.busytimecount++; + + if (dev.busytype == static_cast(DeviceState::READING_FILEDATA)) //下载文件业务 + { + if (dev.busytimecount > 30) + { + std::cout << "[Timeout] Device " << dev.terminal_id + << " busytype=READING_FILEMENU 超时(" + << dev.busytimecount << "s)" << std::endl; + + //发送超时响应 + send_reply_to_cloud(static_cast(ResponseCode::BAD_REQUEST),dev.terminal_id,get_type_by_state(dev.busytype)); + + // 超时清空状态 + dev.guid.clear(); // 清空进行中的 guid + dev.busytype = 0; // 复位业务类型 + dev.isbusy = 0; // 标记空闲 + dev.busytimecount = 0; // 计时清零 + } + } + else //其他业务 + { + if (dev.busytimecount > 10) + { + std::cout << "[Timeout] Device " << dev.terminal_id + << " busytype=" << dev.busytype + << " 超时(" << dev.busytimecount << "s)" << std::endl; + // 超时清空状态 + dev.guid.clear(); + dev.busytype = 0; + dev.isbusy = 0; + dev.busytimecount = 0; + } + } + } + } +} + +/////////////////////////////////////////////////////////////////////////////////// 一分钟调用一次:检查 qvvr_file 超时并备份 +static bool ensure_dir_exists(const std::string &path) +{ + struct stat st; + if (stat(path.c_str(), &st) == 0) { + if (S_ISDIR(st.st_mode)) { + return true; // 已存在 + } + return false; // 存在但不是目录 + } + // 不存在则创建 + if (mkdir(path.c_str(), 0755) == 0) { + return true; + } + return false; +} + +void check_and_backup_qvvr_files() { + std::lock_guard lock(ledgermtx); + + const std::string data_dir = FRONT_PATH + "/data"; + const std::string backup_dir = data_dir + "/comtrade_bak"; + + if (!ensure_dir_exists(data_dir) && !ensure_dir_exists(data_dir)) { + std::cerr << "[check_and_backup_qvvr_files] 创建 data 目录失败\n"; + return; + } + if (!ensure_dir_exists(backup_dir) && !ensure_dir_exists(backup_dir)) { + std::cerr << "[check_and_backup_qvvr_files] 创建 comtrade_bak 目录失败\n"; + return; + } + + for (auto &dev : terminal_devlist) { + for (auto &line : dev.line) { + + // 用迭代器遍历,允许在循环中 erase 当前元素 + for (std::vector::iterator qit = line.qvvrevent.qvvrfile.begin(); + qit != line.qvvrevent.qvvrfile.end(); /* no ++ here */) { + + qvvr_file &qfile = *qit; + + if (!qfile.used_status) { + ++qit; + continue; + } + + ++qfile.file_time_count; // 每分钟+1 + + // 超时阈值:>10 分钟 + if (qfile.file_time_count > 10) { + std::cout << "[Qvvr Timeout] dev=" << dev.terminal_id + << " -> move files to: " << backup_dir << std::endl; + + // 移动该记录内的所有文件 + for (std::list::const_iterator it = qfile.file_download.begin(); + it != qfile.file_download.end(); ++it) { + const std::string &src = *it; + const size_t pos = src.find_last_of("/\\"); + const std::string base = (pos == std::string::npos) ? src : src.substr(pos + 1); + const std::string dst = backup_dir + "/" + base; + + if (rename(src.c_str(), dst.c_str()) != 0) { + std::cerr << " [ERROR] 移动失败: " << src + << " -> " << dst << " , " << strerror(errno) << std::endl; + } else { + std::cout << " moved: " << src << " -> " << dst << std::endl; + } + } + + // ✅ 直接从 qvvrfile 向量中删除这条记录 + qit = line.qvvrevent.qvvrfile.erase(qit); + // 注意:不再对 qfile 读写,因为它已被删除 + continue; // 不自增,由 erase 返回的新迭代器继续 + } + + ++qit; // 正常前进 + } + } + } +} + +/////////////////////////////////////////////////////////////////////////////////////////定值信息发送接口函数 +bool save_set_value(const std::string &dev_id, unsigned char mp_index, const std::vector &fabsf) { + std::lock_guard lock(ledgermtx); + + // 1. 找到对应 terminal_dev + auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), + [&](const terminal_dev &dev) { return dev.terminal_id == dev_id; }); + if (it == terminal_devlist.end()) { + std::cerr << "[send_set_reply] device not found: " << dev_id << std::endl; + return false; + } + + terminal_dev &dev = *it; + + // 2. 检查状态 + if (dev.isbusy != 2 || dev.busytype != static_cast(DeviceState::READING_FIXEDVALUE)) { + std::cerr << "[send_set_reply] device not in READING_FIXEDVALUE state." << std::endl; + return false; + } + + // 3. 遍历监测点,找到 logical_device_seq == mp_index 的监测点 + bool found = false; + for (auto &mon : dev.line) { + if (std::atoi(mon.logical_device_seq.c_str()) == static_cast(mp_index)) { + // ★ 清理原有的 set_values + mon.set_values.clear(); + // ★ 将 fabsf 依次存入该监测点的 set_values + for (const auto &val : fabsf) { + mon.set_values.push_back(val); + } + found = true; + break; + } + } + + if (!found) { + std::cerr << "[send_set_reply] monitor with seq=" << (int)mp_index + << " not found in terminal " << dev_id << std::endl; + return false; + } + + // 4. 状态递减 + dev.isbusy--; + + return true; +} + +bool save_internal_value(const std::string &dev_id, const std::vector &fabsf) { + // 找到对应 terminal_dev + std::lock_guard lock(ledgermtx); + + auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), + [&](const terminal_dev &dev) { return dev.terminal_id == dev_id; }); + if (it == terminal_devlist.end()) { + std::cerr << "[send_set_reply] device not found: " << dev_id << std::endl; + return false; + } + + terminal_dev &dev = *it; + + // 判断 isbusy==3 且 busytype==READING_INTERFIXEDVALUE + if (dev.isbusy != 3 || dev.busytype != static_cast(DeviceState::READING_INTERFIXEDVALUE)) { + std::cerr << "[send_set_reply] device not in READING_INTERFIXEDVALUE state." << std::endl; + return false; + } + + // ★ 新增:清理原有的内部定值列表 + dev.internal_values.clear(); + //将值严格按顺序存入list中 + for (const auto &val : fabsf) { + dev.internal_values.push_back(val); + } + + dev.isbusy--; + + return true; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////回复定值读取响应 +bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, const std::vector &dz_info) { + std::lock_guard lock(ledgermtx); + + // 1) 找终端 + auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), + [&](const terminal_dev &d) { return d.terminal_id == dev_id; }); + if (it == terminal_devlist.end()) { + std::cerr << "[send_set_value_reply] device not found: " << dev_id << std::endl; + return false; + } + terminal_dev &dev = *it; + + // 2) 校验状态:发送“定值读取结果”回复,应处于 READING_FIXEDVALUE;isbusy == 1 + if (dev.isbusy != 1 || dev.busytype != static_cast(DeviceState::READING_FIXEDVALUE)) { //定值读取 + std::cerr << "[send_set_value_reply] device not in READING_FIXEDVALUE state." << std::endl; + return false; + } + + // 3) 定位监测点(mp_index ∈ [1,6]),与 logical_device_seq 比对 + ledger_monitor *pMon = nullptr; + for (auto &mon : dev.line) { + if (std::atoi(mon.logical_device_seq.c_str()) == static_cast(mp_index)) { + pMon = &mon; + break; + } + } + if (!pMon) { + std::cerr << "[send_set_value_reply] monitor with seq=" << (int)mp_index + << " not found in terminal " << dev_id << std::endl; + return false; + } + + // 4) 取该监测点的 set_values,严格按顺序用于 DZ_Value + std::vector ordered_vals; + ordered_vals.reserve(pMon->set_values.size()); + for (float v : pMon->set_values) ordered_vals.push_back(v); + + if (ordered_vals.empty()) { + std::cerr << "[send_set_value_reply] monitor seq=" << (int)mp_index + << " has empty set_values." << std::endl; + return false; + } + + // 5) 生成 JSON(结构严格贴合你给的样例) + nlohmann::json j; + + // 顶层 + j["guid"] = dev.guid; + j["FrontIP"] = FRONT_IP; // 你的前置机 IP(项目已有常量/变量) + j["Node"] = g_front_seg_index; // 节点号(项目已有变量) + j["Dev_mac"] = normalize_mac(dev.addr_str); + + // Detail + nlohmann::json detail; + detail["Type"] = 0x2106; // 设备数据 + + // Msg + nlohmann::json msg; + msg["Cldid"] = mp_index; //测点序号 + msg["DataType"] = 0x0C; //定值 + + // DataArray(对象数组):逐个填充,DZ_Value 严格按 set_values 顺序 + nlohmann::json dataArray = nlohmann::json::array(); + + const size_t n_meta = dz_info.size(); + const size_t n_vals = ordered_vals.size(); + const size_t n = std::min(n_meta, n_vals); // 以两者较短长度为准 + + if (n_meta != n_vals) { + std::cerr << "[send_set_value_reply] warn: dz_info size(" << n_meta + << ") != set_values size(" << n_vals << "), will emit " << n << " items.\n"; + return false; // 或者继续发送,视需求而定 + } + + for (size_t i = 0; i < n; ++i) { + const DZ_TAB_STRUCT &dz = dz_info[i]; + nlohmann::json item; + item["LN_Num"] = dz.LN_Num; + item["DZ_Num"] = dz.DZ_Num; + item["DZ_Name"] = dz.DZ_Name; + item["DZ_Value"] = ordered_vals[i]; // ★ 严格按顺序 + item["DZ_Type"] = dz.DZ_Type; + item["DZ_Min"] = dz.DZ_Min; + item["DZ_Max"] = dz.DZ_Max; + item["DZ_Default"]= dz.DZ_Default; + item["DZ_UNIT"] = dz.DZ_UNIT; + dataArray.push_back(std::move(item)); + } + + msg["DataArray"] = std::move(dataArray); + detail["Msg"] = std::move(msg); + detail["Code"] = 200; + + j["Detail"] = std::move(detail); + + // 6) 入队发送 + queue_data_t connect_info; + connect_info.strTopic = Topic_Reply_Topic; + connect_info.strText = j.dump(); // 序列化为字符串 + + { + std::lock_guard lk(queue_data_list_mutex); + queue_data_list.push_back(std::move(connect_info)); + } + + // 调试打印 + std::cout << "[send_set_value_reply] queued JSON:\n" << j.dump(4) << std::endl; + + // 7) 发送后更新终端状态(按你现有规则) + if (dev.isbusy > 0) { + dev.isbusy--; + } + if (dev.isbusy == 0) { + dev.guid.clear(); + dev.busytype = 0; + if (pMon) { + pMon->set_values.clear();//清理本次定值记录 + } + } + + return true; +} diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index f0a8117..7605517 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -48,9 +48,9 @@ std::vector terminal_devlist; std::mutex ledgermtx; ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// -extern int g_front_seg_index; + extern int g_front_seg_num; -extern std::string FRONT_IP; + extern uint32_t g_node_id; //筛选的终端状态:数组【0,1】筛选运行和在运 diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 3f4008a..1a11cb0 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -98,13 +98,22 @@ public: double CT2; // 电流变比2 qvvr_event qvvrevent; //暂态事件 + + //定值list + std::list set_values; }; //终端台账 class terminal_dev { public: - std::string guid; //台账更新回复用 + std::string guid; //正在进行的guid + int busytype; //业务类型,使用状态机 + int isbusy; //业务进行标志 + int busytimecount; //业务进行计时 + + //内部定值list + std::list internal_values; std::string terminal_id; std::string terminal_name; @@ -442,6 +451,19 @@ void to_json(nlohmann::json& j, const DataArrayItem& d); void to_json(nlohmann::json& j, const MsgObj& m); void to_json(nlohmann::json& j, const FullObj& f); +/////////////////////////////////////////////////////////////////////云平台下发指令的解析 +struct MsgParsed { + int type; // 指令编号 + std::string name; // 文件名/目录名 + int cldid; // 测点号 + int datatype; // 指令细分 + int operate; // 操作读写 + std::vector dataArray_f; // 定值写入,严格按照顺序 + std::vector dataArray_us; // 内部定值写入,严格按照顺序 + + bool ok; +}; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////提供给通讯部分调用的函数 std::vector GenerateDeviceInfoFromLedger(const std::vector& terminal_devlist);//接口读取台账后,再调用这个将台账拷贝过来 @@ -472,10 +494,47 @@ bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vect //录波文件下载完成通知接口 bool update_qvvr_file_download(const std::string& filename_with_mac, const std::string& terminal_id); +//上送文件列表接口 +bool send_file_list(const std::string &dev_id, const std::vector &FileList); + //提取mac std::string normalize_mac(const std::string& mac); +std::string get_mac_by_devid(const std::string &devid); +//暂态文件超时检测 +void check_and_backup_qvvr_files(); + +//业务超时检查 +void check_device_busy_timeout(); + +//业务响应 +void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type); + +//查guid +std::string find_guid_index_from_dev_id(const std::string& dev_id); + +// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +extern int g_front_seg_index; +extern std::string FRONT_IP; +extern std::string FRONT_PATH; + +extern std::string WEB_FILEUPLOAD; +////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// 响应码枚举 +enum class ResponseCode : int { + OK = 200, // 请求成功 + ACCEPTED = 201, // 请求被接受,开始处理 + PROCESSING = 202, // 请求被接受,但是未处理完 + BAD_REQUEST = 400, // 请求失败 + UNAUTHORIZED = 401, // 请求未认证/认证错误(不支持的请求) + REJECTED_BUSY = 402, // 请求被拒绝,在处理同类命令 + FORBIDDEN = 403, // 请求被拒绝(未知原因) + NOT_FOUND = 404, // 请求的资源不存在 + BUSY = 405, // 当前忙,无法响应 + TIMEOUT = 406, // 请求超出了等待时间 + INTERNAL_ERROR = 500 // 其他错误 +}; #endif diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index c5f9599..f5d7004 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -344,18 +344,32 @@ void Front::OnTimerThread() std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::cout << "OnTimerThread::run() is called ...... \n"; - int counter = 0; + int hbCounter = 0; // 心跳计数 + int backupCounter = 0; // 备份计数(分钟用) + send_heartbeat_to_queue("1"); while (!m_IsTimerCancel) { update_log_entries_countdown(); - if (counter >= 30) { + //业务超时检查 + check_device_busy_timeout(); + + // 每 30 秒发一次心跳 + if (hbCounter >= 30) { send_heartbeat_to_queue("1"); - counter = 0; + hbCounter = 0; } - counter++; + + // 每 60 秒调用一次录波文件检查 + if (backupCounter >= 60) { + check_and_backup_qvvr_files(); + backupCounter = 0; + } + + hbCounter++; + backupCounter++; g_ontime_blocked_times = 0; std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -366,7 +380,6 @@ void Front::OnTimerThread() std::cerr << "[OnTimerThread] Caught unknown exception" << std::endl; } - // 设置重启标志 { std::lock_guard lock(m_threadCheckMutex); m_needRestartTimerThread = true; diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 7206732..caaf4e9 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -62,7 +62,6 @@ static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者 //前置进程 extern unsigned int g_node_id; -extern int g_front_seg_index; extern std::string subdir; extern std::string FRONT_INST; @@ -341,6 +340,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer) /////////////////////////////////////////////////////////////////////////////////////////////////查找台账下标 // 根据终端 ID 查找 terminal_devlist 中的索引,找不到返回 -1 int find_dev_index_from_dev_id(const std::string& dev_id) { + std::lock_guard lock(ledgermtx); for (size_t i = 0; i < terminal_devlist.size(); ++i) { if (terminal_devlist[i].terminal_id == dev_id) { return static_cast(i); @@ -350,6 +350,7 @@ int find_dev_index_from_dev_id(const std::string& dev_id) { } int find_mp_index_from_mp_id(const std::string& mp_id) { + std::lock_guard lock(ledgermtx); for (const auto& dev : terminal_devlist) { for (size_t j = 0; j < dev.line.size(); ++j) { if (dev.line[j].monitor_id == mp_id) { @@ -360,6 +361,16 @@ int find_mp_index_from_mp_id(const std::string& mp_id) { return -1; // 未找到 } +std::string find_guid_index_from_dev_id(const std::string& dev_id) { + std::lock_guard lock(ledgermtx); + for (size_t i = 0; i < terminal_devlist.size(); ++i) { + if (terminal_devlist[i].terminal_id == dev_id) { + return terminal_devlist[i].guid; + } + } + return ""; // 未找到 +} + /////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理 std::string parseJsonMessageRC(const std::string& inputJson) { @@ -1619,6 +1630,352 @@ void rocketmq_test_rc(Front* front)//用来测试补招 ////////////////////////////////////////////////////////////////////////////////////////////////////////////云前置新增功能 +bool parseJsonMessageCLOUD(const std::string &body, + std::string &devid, + std::string &guid, + nlohmann::json &detailObj, // 这里返回整个 Detail + std::string &front_ip, // 新增:返回 FrontIP + int &node) // 新增:返回 Node +{ + try { + auto j = nlohmann::json::parse(body); + + // guid + if (j.contains("guid") && j["guid"].is_string()) { + guid = j["guid"].get(); + } else { + guid.clear(); + } + + // FrontIP + if (j.contains("FrontIP") && j["FrontIP"].is_string()) { + front_ip = j["FrontIP"].get(); + } else { + front_ip.clear(); + } + + // Node + if (j.contains("Node") && j["Node"].is_number_integer()) { + node = j["Node"].get(); + } else { + node = 0; + } + + // Dev_id(兼容字符串或数字) + if (j.contains("Dev_id")) { + if (j["Dev_id"].is_string()) { + devid = j["Dev_id"].get(); + } else if (j["Dev_id"].is_number_integer()) { + devid = std::to_string(j["Dev_id"].get()); + } else if (j["Dev_id"].is_number_unsigned()) { + devid = std::to_string(j["Dev_id"].get()); + } else if (j["Dev_id"].is_number_float()) { + devid = std::to_string(j["Dev_id"].get()); + } else { + devid.clear(); + } + } else { + devid.clear(); + } + + // Detail(完整放入 detailObj + if (j.contains("Detail") && j["Detail"].is_object()) { + detailObj = j["Detail"]; // 直接保存整个 Detail + } else { + detailObj = nlohmann::json::object(); + } + + return true; + } + catch (const std::exception &e) { + std::cerr << "[parseJsonMessageCLOUD] JSON parse error: " << e.what() << "\n"; + guid.clear(); + devid.clear(); + front_ip.clear(); + node = 0; + detailObj = nlohmann::json::object(); + return false; + } +} + +int recordguid(const std::string &devid, + const std::string &guid, + int busytype,int busycount) +{ + std::lock_guard lock(ledgermtx); + for (auto &dev : terminal_devlist) { + if (dev.terminal_id == devid) { + if (dev.isbusy == 1) { + std::cout << "Dev is busy,busytype is" << dev.busytype << std::endl; + //响应guid:正忙 + + return dev.busytype; // 正在忙,不能记录 + } + dev.guid = guid; + dev.busytype = busytype; + dev.isbusy = busycount; + dev.busytimecount = 0; + return 0; + } + } + std::cout << "Dev not found" << std::endl; + //响应guid:失败 + + return -1; // 未找到对应的装置 +} + + +// 按 type 解析 Msg +bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann::json& detailObj) { + MsgParsed parsed; + nlohmann::json msgObj; + + // 直接解析 detailObj 的 Type + if (detailObj.contains("Type")) { + if (detailObj["Type"].is_string()) { + try { + parsed.type = std::stoi(detailObj["Type"].get(), nullptr, 0); // 支持 "0x2106" 格式 + } catch (...) { + return false; + } + } else if (detailObj["Type"].is_number_integer()) { + parsed.type = detailObj["Type"].get(); + } else if (detailObj["Type"].is_number_unsigned()) { + parsed.type = static_cast(detailObj["Type"].get()); + } else { + return false; + } + } else { + return false; + } + + // 直接解析 detailObj 的 Msg + if (detailObj.contains("Msg") && detailObj["Msg"].is_object()) { + msgObj = detailObj["Msg"]; + } else { + msgObj = nlohmann::json::object(); + } + + try { + switch (parsed.type) { + case 0x2131: { // 读取目录 + + if(!recordguid(devid,guid,static_cast(DeviceState::READING_FILEMENU),1)){ + return true; + } + + if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false; + parsed.name = msgObj["Name"].get(); + parsed.ok = true; + + std::cout << "[dir parsemsg] Name: " << parsed.name << std::endl; + + // 添加指令到队列当中 + ClientManager::instance().add_file_menu_action_to_device(devid, parsed.name); + return true; + } + + case 0x2132: { // 下载文件 + + if(!recordguid(devid,guid,static_cast(DeviceState::READING_FILEDATA),1)){ + return true; + } + + if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false; + parsed.name = msgObj["Name"].get(); + parsed.ok = true; + + std::cout << "[file parsemsg] Name: " << parsed.name << std::endl; + + // 下发指令 + ClientManager::instance().add_file_download_action_to_device(devid, parsed.name); + return true; + } + + case 0x2106: { // 定值/内部定值 + if (!msgObj.contains("Cldid") || !msgObj["Cldid"].is_number_integer()) return false; + if (!msgObj.contains("DataType") || !msgObj["DataType"].is_number_integer()) return false; + if (!msgObj.contains("Operate") || !msgObj["Operate"].is_number_integer()) return false; + if (!msgObj.contains("DataArray")|| !msgObj["DataArray"].is_array()) return false; + + parsed.cldid = msgObj["Cldid"].get(); + parsed.datatype = msgObj["DataType"].get(); + parsed.operate = msgObj["Operate"].get(); + + // 调试打印 + std::cout << "[parsemsg] Cldid=" << parsed.cldid + << ", DataType=0x" << std::hex << parsed.datatype << std::dec + << ", Operate=" << parsed.operate + << std::endl; + + // 先清空数组,避免复用对象时残留 + parsed.dataArray_f.clear(); + parsed.dataArray_us.clear(); + + switch (parsed.datatype) { + case 0x0C: { // 定值(float 阵列) + + for (const auto& v : msgObj["DataArray"]) { + if (!v.is_number()) return false; + // 统一按 double 取,再强转成 float 更稳妥 + parsed.dataArray_f.push_back(static_cast(v.get())); + } + + // 打印 DataArray + std::cout << "[0x0C] DataArray=["; + for (size_t i = 0; i < parsed.dataArray_f.size(); ++i) { + std::cout << parsed.dataArray_f[i] << (i + 1 < parsed.dataArray_f.size() ? ", " : ""); + } + std::cout << "]" << std::endl; + + parsed.ok = true; + + // 根据 Operate 分流(1=读,2=写) + switch (parsed.operate) { + case 1: { // 读 + + if(!recordguid(devid,guid,static_cast(DeviceState::READING_FIXEDVALUE),2)){ + return true; + } + + ClientManager::instance().get_fixedvalue_action_to_device( + devid, static_cast(parsed.cldid)); // 获取装置测点定值数据 + ClientManager::instance().get_fixedvaluedes_action_to_device(devid); // 获取装置定值描述 + break; + } + case 2: { // 写 + + if(!recordguid(devid,guid,static_cast(DeviceState::SET_FIXEDVALUE),1)){ + return true; + } + + ClientManager::instance().set_fixedvalue_action_to_device( + devid, static_cast(parsed.cldid), parsed.dataArray_f); // 装置修改定值 + break; + } + default: + return false; + } + break; + } + + case 0x0D: { // 内部定值(uint16_t 阵列) + for (const auto& v : msgObj["DataArray"]) { + if (!v.is_number_integer() && !v.is_number_unsigned()) return false; + // 范围校验 [0, 65535] + long long val = v.get(); + if (val < 0 || val > 65535) return false; + parsed.dataArray_us.push_back(static_cast(val)); + } + + // 打印 DataArray + std::cout << "[0x0D] DataArray=["; + for (size_t i = 0; i < parsed.dataArray_us.size(); ++i) { + std::cout << parsed.dataArray_us[i] << (i + 1 < parsed.dataArray_us.size() ? ", " : ""); + } + std::cout << "]" << std::endl; + + parsed.ok = true; + + // 根据 Operate 分流(1=读,2=写) + switch (parsed.operate) { + case 1: { // 读 + + if(!recordguid(devid,guid,static_cast(DeviceState::READING_INTERFIXEDVALUE),3)){ + return true; + } + + ClientManager::instance().get_interfixedvalue_action_to_device(devid); // 获取内部定值 + ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 1); // 1-内部定值描述 + ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 2); // 2-控制字描述 + break; + } + case 2: { // 写 + + if(!recordguid(devid,guid,static_cast(DeviceState::SET_INTERFIXEDVALUE),1)){ + return true; + } + + ClientManager::instance().set_interfixedvalue_action_to_device(devid, parsed.dataArray_us); + break; + } + default: + return false; + } + break; + } + + default: + return false; + } + + return true; + } + + default: + return false; + } + } catch (const std::exception& e) { + std::cerr << "[parsemsg] exception: " << e.what() << std::endl; + return false; + } catch (...) { + std::cerr << "[parsemsg] unknown exception" << std::endl; + return false; + } +} + +//心跳和其他响应 +void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type) { + try { + std::string guid = find_guid_index_from_dev_id(dev_id); + if(guid == "") + { + std::cerr << "dev: " << dev_id << " guid not found" << std::endl; + return; + } + + // ---- 构造根 JSON ---- + nlohmann::json obj; + obj["guid"] = guid; + obj["FrontIP"] = FRONT_IP; + obj["Node"] = g_front_seg_index; + + // Dev_mac:从台账取 addr_str 并规范化 + std::string mac = get_mac_by_devid(dev_id); + obj["Dev_mac"] = mac; + + // ---- 构造 Detail ---- + nlohmann::json detail; + detail["Type"] = type; + + // Msg + nlohmann::json msg; + msg["Time"] = static_cast(std::time(nullptr)); + detail["Msg"] = std::move(msg); + + // Code + detail["Code"] = reply_code; + + obj["Detail"] = std::move(detail); + + // ---- 入队发送 ---- + queue_data_t connect_info; + connect_info.strTopic = Topic_Reply_Topic; + connect_info.strText = obj.dump(); // 序列化为字符串 + + { + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(std::move(connect_info)); + } + + // 调试打印 + std::cout << "[send_reply_to_cloud] queued: " << obj.dump() << std::endl; + } + catch (const std::exception& e) { + std::cerr << "send_reply_to_cloud exception: " << e.what() << std::endl; + } +} + //云前置功能 rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg) { //未初始化不处理消费 @@ -1637,7 +1994,7 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg) // 日志记录 DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的云前置控制消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_CLOUD.c_str()); - std::cout << "rtdata Callback received message: " << body << std::endl; + std::cout << "cloud Callback received message: " << body << std::endl; if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { @@ -1645,38 +2002,57 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg) } // 消息解析 - std::string devid, line; - bool realData = false, soeData = false; - int limit = 0; + std::string guid; + std::string devid; + std::string FrontIP; + int Node; + nlohmann::json DetailObj; - if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) { + if (!parseJsonMessageCLOUD(body, devid, guid, DetailObj,FrontIP,Node)) { std::cerr << "Failed to parse the JSON message." << std::endl; DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的云前置控制消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); return rocketmq::RECONSUME_LATER; } - // 加锁访问台账 - int dev_index; - int mp_index; - if( !devid.empty() && !line.empty()){ - std::lock_guard lock(ledgermtx); - dev_index = find_dev_index_from_dev_id(devid); - mp_index = find_mp_index_from_mp_id(line); - } - else{ - std::cerr << "rtdata is NULL." << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的云前置控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); - } - + // ====== 调试打印 ====== + std::cout << "[CLOUD Msg Parsed] " + << "guid=" << guid + << ", devid=" << devid + << ", FrontIP=" << FrontIP + << ", Node=" << Node + << std::endl; - if (dev_index == -1 || mp_index == -1) { - std::cerr << "dev index or mp index is not found" << std::endl; - return rocketmq::RECONSUME_LATER; + if(FrontIP != FRONT_IP || Node != g_front_seg_index){ + std::cout << "当前进程不消费这个消息" << std::endl; + return rocketmq::CONSUME_SUCCESS; } - - //不再使用文件触发方式,直接调用接口向终端发起请求 - ClientManager::instance().set_real_state_count(devid, 60,mp_index);//一秒询问一次,询问60次 + if(!parsemsg(devid,guid,DetailObj)){ + std::cerr << "clouddata is error." << std::endl; + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的云前置控制消息失败,消息无法解析", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); + } return rocketmq::CONSUME_SUCCESS; -} \ No newline at end of file +} + +void rocketmq_test_getdir(Front* front)//用来测试目录获取 +{ + if (!front || !front->m_producer) { + std::cerr << "front 或 producer 无效\n"; + return; + } + + rocketmq::RocketMQProducer* producer = front->m_producer; + + queue_data_t data; + data.monitor_no = 123; + data.strTopic = G_MQCONSUMER_TOPIC_CLOUD; + std::ifstream file("getdir.txt"); // 文件中存储长字符串 + std::stringstream buffer; + buffer << file.rdbuf(); // 读取整个文件内容 + + data.strText = std::string(buffer.str()); + data.mp_id = "123123"; + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); +} diff --git a/LFtid1056/cloudfront/code/rocketmq.h b/LFtid1056/cloudfront/code/rocketmq.h index 906bf80..4b4e493 100644 --- a/LFtid1056/cloudfront/code/rocketmq.h +++ b/LFtid1056/cloudfront/code/rocketmq.h @@ -340,7 +340,7 @@ void rocketmq_test_rc(Front* front); void rocketmq_test_set(Front* front); void rocketmq_test_ud(Front* front); void rocketmq_test_rt(Front* front); - +void rocketmq_test_getdir(Front* front); void InitializeProducer(rocketmq::RocketMQProducer*& producer); #endif // _ROCKETMQ_CLIENT_WRAPPER_H_ diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 709147b..5a055f2 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -294,6 +294,7 @@ extern bool normalOutputEnabled; "G_TEST_NUM= - Set the G_TEST_NUM\r\n" "G_TEST_TYPE= - Set the G_TEST_TYPE 0:use ledger,1:use number\r\n" "LOG= - Set the LOG\r\n" + "dir - Execute rocketmq_test_getdir\r\n" "rc - Execute rocketmq_test_rc\r\n" "rt - Execute rocketmq_test_rt\r\n" "ud - Execute rocketmq_test_ud\r\n" @@ -325,6 +326,9 @@ extern bool normalOutputEnabled; } else if (cmd == "rc") { rocketmq_test_rc(m_front); sendStr(clientFD, "\r\x1B[KExecuted rocketmq_test_rc\r\n"); + } else if (cmd == "getdir") { + rocketmq_test_getdir(m_front); + sendStr(clientFD, "\r\x1B[KExecuted rocketmq_test_getdir\r\n"); } else if (cmd == "rt") { rocketmq_test_rt(m_front); sendStr(clientFD, "\r\x1B[KExecuted rocketmq_test_rt\r\n"); diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 10f3599..7e78cf1 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -705,7 +705,7 @@ void process_received_message(string mac, string id,const char* data, size_t len } // ӷļб߼ - // : send_file_list(FileList); + send_file_list(id,FileList);//lnk20250813 // ɺ״̬ ClientManager::instance().change_device_state(id, DeviceState::IDLE); @@ -790,6 +790,12 @@ void process_received_message(string mac, string id,const char* data, size_t len out_file.write(reinterpret_cast(file_data.data()), file_data.size()); std::cout << "File saved: " << file_path << std::endl; + + //ʹýӿļlnk20250826 + std::string filename; + SendFileWeb(WEB_FILEUPLOAD, file_path, file_path, filename); + std::cout << "File upload: " << filename << std::endl; + } else { std::cerr << "Failed to save file: " << file_path @@ -864,6 +870,9 @@ void process_received_message(string mac, string id,const char* data, size_t len std::cout << " Value[" << j << "]: " << fList[j] << std::endl; } + //洢ֵlnk20250827 + save_internal_value(id, monitor_index, fList); + //Զֵ޸Ĺ //ClientManager::instance().set_fixedvalue_action_to_device(id, monitor_index, fList); @@ -945,6 +954,11 @@ void process_received_message(string mac, string id,const char* data, size_t len << ", Unit=" << dz_unit << std::endl; } + //lnk20250828 + // ȡ (һֽ) + uint8_t monitor_index = parser.RecvData[0]; + std::cout << "Monitor Index: " << static_cast(monitor_index) << std::endl; + send_set_value_reply(id, monitor_index, dz_list); //ֵȡϣΪУ ClientManager::instance().change_device_state(id, DeviceState::IDLE); @@ -960,6 +974,10 @@ void process_received_message(string mac, string id,const char* data, size_t len //װöֵ if (udata[8] == static_cast(MsgResponseType::Response_NewACK)) { std::cout << "set success" << mac << std::endl; + + //Ӧlnk20250828 + send_reply_to_cloud(static_cast(ResponseCode::OK),id,static_cast(DeviceState::SET_FIXEDVALUE)); + //ֵóɹΪУ ClientManager::instance().change_device_state(id, DeviceState::IDLE); } @@ -1010,6 +1028,9 @@ void process_received_message(string mac, string id,const char* data, size_t len std::cout << " Value[" << j << "]: " << fList[j] << std::endl; } + //洢ֵlnk20250827 + save_internal_value(id, fList); + //ڲֵ޸IJ //ClientManager::instance().set_interfixedvalue_action_to_device(id, fList);