diff --git a/cfg_parse/SimpleProducer.cpp b/cfg_parse/SimpleProducer.cpp index ed99072..9382f45 100644 --- a/cfg_parse/SimpleProducer.cpp +++ b/cfg_parse/SimpleProducer.cpp @@ -8,6 +8,7 @@ #include #include #include "../mms/db_interface.h" + #include "../include/rocketmq/CProducer.h" #include "../include/rocketmq/CMessage.h" #include "../include/rocketmq/CSendResult.h" @@ -723,6 +724,8 @@ void rocketmq_producer_send(const char* strbody,const char* topic) } #endif + + extern "C" { extern std::string G_MQCONSUMER_TOPIC_RT; void rocketmq_test_rt() @@ -795,14 +798,12 @@ void rocketmq_test_rc() data.mp_id = 123123; my_rocketmq_send(data); } + } -std::string to_string(long long value) { - std::stringstream ss; - ss << value; - return ss.str(); -} + +#if 0 void rocketmq_test_300(int mpnum,int front_index) { Ckafka_data_t data; data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC); @@ -879,3 +880,6 @@ void rocketmq_test_300(int mpnum,int front_index) { std::cout << "Finished sending " << total_messages << " messages." << std::endl; } +#endif + + diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index fc8064c..88ccfec 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -15023,7 +15023,108 @@ void clearIed(ied_t *ied) { } /*封装C可调用的台账更新函数 *///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - +//测试函数 +std::string to_string(long long value) { + std::stringstream ss; + ss << value; + return ss.str(); +} +void rocketmq_test_300(int mpnum,int front_index) { + Ckafka_data_t data; + data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC); + data.mp_id = "0"; + + // 读取文件内容 + std::ifstream file("long_string.txt"); // 文件中存储长字符串 + std::stringstream buffer; + buffer << file.rdbuf(); + std::string file_contents = buffer.str(); // 获取文件内容 + std::string base_strText = file_contents; + + // 获取当前时间作为开始时间 + std::time_t t = std::time(NULL);//获取当前的系统时间(自 1970 年 1 月 1 日以来的秒数,通常称为 UNIX 时间戳) + std::tm* time_info = std::localtime(&t);//将 std::time_t(表示当前的 UNIX 时间戳)转换为本地时间(std::tm 结构) + time_info->tm_sec = 0; // 清零秒位 + //time_info->tm_msec = 0; // 清零毫秒位(如果需要更精确,使用高精度时间) + + // 获取当前的时间戳(秒) + std::time_t base_time_t = std::mktime(time_info);//将 std::tm 结构(本地时间)转换回 std::time_t(时间戳) + + // 计算每条消息的时间戳,精确到分钟,毫秒和秒清零 + long long current_time_ms = static_cast(base_time_t) * 1000; // 每分钟递增,单位毫秒 + + // 设定总的消息数量 + int total_messages = mpnum; + + ied_t* ied; + ied_usr_t* ied_usr; + + // 循环发送 300 条消息 + for (int i = 0; total_messages != 0 && i < g_node->n_clients; ++i) { + + ied = (ied_t*)g_node->clients[i]; + if(ied != NULL){ + ied_usr = (ied_usr_t*)ied->usr_ext; + + //跳过正常的终端 + if(strcmp(ied_usr->terminal_id, "123456") == 0){ + continue; + } + + for (int j = 0; j < 10 && ied_usr->LD_info[j].mp_id[0] != '\0'; j++){ + // 修改 Monitor 值 + char monitor_id[256] = {}; + strncpy(monitor_id, ied_usr->LD_info[j].mp_id, sizeof(monitor_id) - 1); + monitor_id[sizeof(monitor_id) - 1] = '\0'; + + data.mp_id = QString(monitor_id); + + data.monitor_id = i + j; + + std::string modified_time = to_string(current_time_ms); // 时间转换为整数类型(Unix时间戳) + + // 替换消息中的 Monitor 和 TIME 字段(只匹配字段名,不匹配具体数值) + std::string modified_strText = base_strText; + + // 替换 Monitor 字段 + size_t monitor_pos = modified_strText.find("\"Monitor\""); + if (monitor_pos != std::string::npos) { + size_t colon_pos = modified_strText.find(":", monitor_pos); + size_t quote_pos = modified_strText.find("\"", colon_pos); + size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1); + if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { + modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id.toStdString()); + } + } + + // 替换 TIME 字段 + size_t time_pos = modified_strText.find("\"TIME\""); + if (time_pos != std::string::npos) { + size_t colon_pos = modified_strText.find(":", time_pos); + size_t quote_pos = colon_pos; + size_t end_quote_pos = modified_strText.find(",", quote_pos + 1); + if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) { + modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time); + } + } + + // 更新数据 + data.strText = QString::fromStdString(modified_strText); + + // 发送数据 + my_rocketmq_send(data); + + // 输出调试信息 + std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_id << " and TIME " << modified_time << std::endl; + + // 等待下一条消息的发送(固定为1分钟) + //QThread::sleep(60); // 每次发送间隔1分钟 + } + } + } + + std::cout << "Finished sending " << total_messages << " messages." << std::endl; +} diff --git a/json/save2json.cpp b/json/save2json.cpp index b6f988b..af7af6e 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -726,7 +726,7 @@ std::string extractDataJson(const char* inputJson) { if (messageJson == NULL || messageJson->type != cJSON_String) { std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl; cJSON_Delete(root); - return false; + return ""; } // 解析 messageBody 中的 JSON 字符串 @@ -734,14 +734,14 @@ std::string extractDataJson(const char* inputJson) { if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) { std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl; cJSON_Delete(root); - return false; + return ""; } cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串 if (messageBody == NULL) { std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; cJSON_Delete(root); - return false; + return ""; } // 提取 "data" 部分