From a87504cee9c944ad090e729535cdc5dbd72e4b64 Mon Sep 17 00:00:00 2001 From: lnk Date: Fri, 9 Jan 2026 16:33:08 +0800 Subject: [PATCH] add test fun --- LFtid1056/cloudfront/code/rocketmq.cpp | 135 +++++++++++++++++++++++++ LFtid1056/cloudfront/code/rocketmq.h | 2 + LFtid1056/cloudfront/code/worker.cpp | 38 +++++++ 3 files changed, 175 insertions(+) diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index dee7932..8d6d02f 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -1825,6 +1825,141 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { } } +///////////////////////////////////////////////////////////////////////////////////////////////////////////批量台账更新函数 +// 生成无分隔符 MAC:00B88D00BB01 这种 +static std::string MakeMacNoDash(int nIndex, int nType) +{ + long long TestIndex = 0x0000bb00LL + (long long)nIndex + (long long)nType * 0x00010000LL; + + unsigned int A = (unsigned int)(TestIndex / 65536LL); + unsigned int B = (unsigned int)((TestIndex & 0x0000ff00LL) >> 8); + unsigned int C = (unsigned int)(TestIndex & 0x000000ffLL); + + std::ostringstream oss; + oss << "00B88D" + << std::uppercase << std::hex << std::setw(2) << std::setfill('0') << A + << std::uppercase << std::hex << std::setw(2) << std::setfill('0') << B + << std::uppercase << std::hex << std::setw(2) << std::setfill('0') << C; + return oss.str(); +} + +// 读取 ud.txt 模板 +static bool ReadWholeFile(const std::string& path, std::string& out) +{ + std::ifstream file(path.c_str(), std::ios::in); + if (!file.is_open()) return false; + std::stringstream buffer; + buffer << file.rdbuf(); + out = buffer.str(); + return true; +} + +// 批量发送台账更新(入参:进程号、起始数字、生成个数) +void rocketmq_test_ud_batch(Front* front, int processNo, int start, int count) +{ + if (!front || !front->m_producer) { + std::cerr << "front 或 producer 无效\n"; + return; + } + if (processNo <= 0 || start <= 0 || count <= 0) { + std::cerr << "参数非法: processNo/start/count 必须 > 0\n"; + return; + } + + // 1) 读模板 + std::string raw; + if (!ReadWholeFile("mult.txt", raw)) { + std::cerr << "读取 mult.txt 失败\n"; + return; + } + + // 2) 解析外层 JSON + nlohmann::json outer; + try { + outer = nlohmann::json::parse(raw); + } catch (const std::exception& e) { + std::cerr << "外层 JSON 解析失败: " << e.what() << "\n"; + return; + } + + // 3) 解析 messageBody(内层 JSON 字符串) + if (!outer.contains("messageBody") || !outer["messageBody"].is_string()) { + std::cerr << "ud.txt 缺少 messageBody 或类型不是字符串\n"; + return; + } + + nlohmann::json innerTpl; + try { + innerTpl = nlohmann::json::parse(outer["messageBody"].get()); + } catch (const std::exception& e) { + std::cerr << "messageBody 内层 JSON 解析失败: " << e.what() << "\n"; + return; + } + + // 你的 baseId(从模板里取更稳妥;这里按你给的例子“id”字段提取) + // innerTpl["data"][0]["id"] = "7e4a...7c96" + std::string baseId; + try { + baseId = innerTpl["data"][0]["id"].get(); + } catch (...) { + std::cerr << "无法从 innerTpl.data[0].id 获取 baseId\n"; + return; + } + + // 4) 循环构造并发送 + for (int k = 0; k < count; ++k) { + int i = start + k; + + // 拷贝模板,每条消息独立修改 + nlohmann::json inner = innerTpl; + nlohmann::json outOuter = outer; + + // 修改 processNo / maxProcessNum + inner["processNo"] = processNo; + inner["data"][0]["maxProcessNum"] = processNo; + + // 生成字段 + std::string newId = baseId + std::to_string(i); // 设备 id + std::string newIp = MakeMacNoDash(i, /*nType*/0); // 00B88D00BB01..32 + std::string newName = std::string("压测测试lnk虚拟") + std::to_string(10 + i); // 11..(10+start+count-1) + std::string newMonId = std::string("00B88D00BB1") + std::to_string(i); // 101..150 + + // 写入 data[0] + inner["data"][0]["id"] = newId; + inner["data"][0]["ip"] = newIp; + inner["data"][0]["name"] = newName; + + // 写入 monitorData[0] + if (inner["data"][0].contains("monitorData") && inner["data"][0]["monitorData"].is_array() && + !inner["data"][0]["monitorData"].empty()) + { + inner["data"][0]["monitorData"][0]["id"] = newMonId; + inner["data"][0]["monitorData"][0]["name"] = newName; + inner["data"][0]["monitorData"][0]["deviceId"] = newId; + } + + // 把内层 JSON dump 回 messageBody 字符串(外层 dump 会自动转义) + outOuter["messageBody"] = inner.dump(); + + // 组装 queue_data_t 并入队 + queue_data_t data; + data.monitor_no = 123; // 你原来的测试值 + data.strTopic = G_MQCONSUMER_TOPIC_UD; + data.strText = outOuter.dump(); + data.mp_id = "123123"; + data.tag = FRONT_INST; + data.key = G_ROCKETMQ_KEY_TEST; + + { + std::lock_guard lock(queue_data_list_mutex); + queue_data_list.push_back(data); + } + + // 间隔 1 秒 + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////其他测试函数 void rocketmq_test_rt(Front* front)//用来测试实时数据 diff --git a/LFtid1056/cloudfront/code/rocketmq.h b/LFtid1056/cloudfront/code/rocketmq.h index 71e9f1c..681b2a6 100644 --- a/LFtid1056/cloudfront/code/rocketmq.h +++ b/LFtid1056/cloudfront/code/rocketmq.h @@ -347,6 +347,8 @@ void rocketmq_test_rt(Front* front); void rocketmq_test_getdir(Front* front); void InitializeProducer(rocketmq::RocketMQProducer*& producer); +void rocketmq_test_ud_batch(Front* front, int processNo, int start, int count); + #endif // _ROCKETMQ_CLIENT_WRAPPER_H_ diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 9c1e4d2..d3d7be0 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -299,6 +299,7 @@ extern bool normalOutputEnabled; "Available commands:\r\n" "G_TEST_NUM= - Set the G_TEST_NUM\r\n" "G_TEST_TYPE= - Set the G_TEST_TYPE 0:use ledger,1:use number\r\n" + "TESTLEDGER ,, - Batch send UD ledger updates (e.g. TESTLEDGER 3,1,50)\r\n" "LOG= - Set the LOG\r\n" "MAX= - Set the MAX_ITEMS\r\n" "dir - Execute rocketmq_test_getdir\r\n" @@ -315,6 +316,43 @@ extern bool normalOutputEnabled; "exit - Exit the shell\r\n" "help - Show this help message\r\n"; sendStr(clientFD, "\r\x1B[K" + helpText); + } else if (cmd.find("TESTLEDGER") == 0) { + // 支持:TESTLEDGER 3,1,50(中间允许空格) + size_t pos = cmd.find(' '); + if (pos == std::string::npos || pos + 1 >= cmd.size()) { + sendStr(clientFD, "\r\x1B[KUsage: TESTLEDGER ,,\r\n"); + } else { + std::string args = cmd.substr(pos + 1); + + // 去掉可能的空格 + for (size_t i = 0; i < args.size(); ) { + if (args[i] == ' ' || args[i] == '\t' || args[i] == '\r' || args[i] == '\n') + args.erase(i, 1); + else + ++i; + } + + // 解析三个整数 + int processNo = 0, start = 0, count = 0; + size_t c1 = args.find(','); + size_t c2 = (c1 == std::string::npos) ? std::string::npos : args.find(',', c1 + 1); + + if (c1 == std::string::npos || c2 == std::string::npos) { + sendStr(clientFD, "\r\x1B[KUsage: TESTLEDGER ,, (e.g. TESTLEDGER 3,1,50)\r\n"); + } else { + processNo = std::atoi(args.substr(0, c1).c_str()); + start = std::atoi(args.substr(c1 + 1, c2 - (c1 + 1)).c_str()); + count = std::atoi(args.substr(c2 + 1).c_str()); + + if (processNo <= 0 || start <= 0 || count <= 0) { + sendStr(clientFD, "\r\x1B[KInvalid args. Need >0. Example: TESTLEDGER 3,1,50\r\n"); + } else { + // 调用批量发送(每条间隔1秒的逻辑在函数内部) + rocketmq_test_ud_batch(m_front, processNo, start, count); + sendStr(clientFD, "\r\x1B[KExecuted TESTLEDGER batch send\r\n"); + } + } + } } else if (cmd.find("viewlog") == 0) { showinshellflag = true; handleViewLogCommand(cmd, clientFD);