add mq log and add test type in test mod

This commit is contained in:
lnk
2025-05-28 16:09:15 +08:00
parent 8bc9f3c5e8
commit 59ea59f918
11 changed files with 327 additions and 123 deletions

View File

@@ -301,6 +301,7 @@ std::string Topic_Reply_Key = "";
int G_TEST_FLAG = 0;
int G_TEST_NUM = 0;
int G_TEST_TYPE = 0;
int TEST_PORT = 11000;//用于当前进程登录测试shell的端口
std::string G_TEST_LIST = "";//测试用的发送实际数据的终端列表
@@ -684,6 +685,7 @@ void init_config() {
//MQ测试
G_TEST_FLAG = settings.value("RocketMq/Testflag", 0).toInt();
G_TEST_NUM = settings.value("RocketMq/Testnum", 0).toInt();
G_TEST_TYPE = settings.value("RocketMq/Testtype", 0).toInt();
ba = settings.value("RocketMq/TestList", 0).toString().toLatin1();
G_TEST_LIST = strdup(ba.data());
@@ -735,7 +737,7 @@ void init_config() {
//Mq测试相关打印
std::cout << "Read G_TEST_FLAG:" << G_TEST_FLAG << std::endl;
std::cout << "Read G_TEST_NUM:" << G_TEST_NUM << std::endl;
std::cout << "Read G_TEST_TYPE:" << G_TEST_TYPE << std::endl;
//20241212lnk添加多前置
if (g_front_seg_index != 0 && g_front_seg_num != 0) {
@@ -1787,15 +1789,18 @@ int parse_ledger_update_xml(trigger_update_xml_t* trigger_update_xml)
//加载一个文件的内容到数据结构
if (!load_ledger_update_from_xml(trigger_update_xml, filename)) {
std::cout << "read /etc/ledgerupdate/" << filename << " success..." << std::endl;
DIY_WARNLOG("process","前置的%s%d号进程 读取台账更新文件成功,开始更新台账", get_front_msg_from_subdir(), g_front_seg_index);
}
//处理过的文件删除掉
if (std::remove(filename.c_str()) != 0) {
std::cerr << "Failed to remove file: " << filename << " Error: " << strerror(errno) << std::endl;
DIY_ERRORLOG("process","前置的%s%d号进程 删除已读取的台账更新文件失败!请检查", get_front_msg_from_subdir(), g_front_seg_index);
return APR_EGENERAL;
}
else{
std::cout << "remove file: " << filename << " success..." << std::endl;
DIY_INFOLOG("process","前置的%s%d号进程 删除已读取的台账更新文件成功", get_front_msg_from_subdir(), g_front_seg_index);
}
}
}
@@ -1953,6 +1958,7 @@ int load_3s_data_from_xml(trigger_3s_xml_t* trigger_3s_xml, QString xml_fn)
QString BAK_WEBSERVICE_3S_TRIG_COMMAND_XML_FN = THREE_SECS_WEBSERVICE_DIR + "bak_3s_trig_command.txt";
int parse_3s_xml(trigger_3s_xml_t* trigger_3s_xml)
{
//调试用
printf("begin 3s xml...\n");
memset(trigger_3s_xml, 0, sizeof(trigger_3s_xml_t));
@@ -1962,7 +1968,8 @@ int parse_3s_xml(trigger_3s_xml_t* trigger_3s_xml)
QString the_webservice_xml_fn = get_3s_trig_fn();// ../etc/trigger3s/目录下的最新的xml文件这个文件是用来打开实时触发的开关
printf("the_webservice_xml_fn.size():%d\n",the_webservice_xml_fn.size());
//调试用
//printf("the_webservice_xml_fn.size():%d\n",the_webservice_xml_fn.size());
if (the_webservice_xml_fn.size() > 4) {//文件名大于4说明找到文件
apr_sleep(apr_time_from_sec(1) / 10);
@@ -1972,9 +1979,13 @@ int parse_3s_xml(trigger_3s_xml_t* trigger_3s_xml)
QFile::rename(the_webservice_xml_fn, BAK_WEBSERVICE_3S_TRIG_COMMAND_XML_FN);
printf("/etc/trigger3s/*.xml success...\n");
DIY_WARNLOG("process","前置读取实时数据触发文件成功,即将注册实时数据报告");
return APR_SUCCESS;
}
//调试用
printf("3s xml fail...\n");
return APR_EGENERAL;
}
@@ -2214,6 +2225,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, char* id)
QDir dir(cfg_dir);
if (!dir.exists()) {
qDebug() << "folder does not exist!";
DIY_ERRORLOG("process","前置的%s%d号进程 无法解析补招文件,补招文件路径/FeProject/etc/recall/不存在", get_front_msg_from_subdir(), g_front_seg_index);
return false;
}
//指定文件后缀名,可指定多种类型
@@ -2229,6 +2241,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, char* id)
if (!file.open(QIODevice::ReadOnly))
{
qDebug() << "file.open error";
DIY_ERRORLOG("process","前置的%s%d号进程 无法打开补招文件%s", get_front_msg_from_subdir(), g_front_seg_index,qstrRecallPath.toStdString().c_str());
continue; //以只读方式打开
}
bool ret = doc.setContent(&file);
@@ -2236,6 +2249,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, char* id)
if (!ret)
{
qDebug() << "doc.setContent error";
DIY_ERRORLOG("process","前置的%s%d号进程 无法解析补招文件%s,补招内容无效", get_front_msg_from_subdir(), g_front_seg_index,qstrRecallPath.toStdString().c_str());
continue;
}
//将文件内容读到doc中
@@ -2714,6 +2728,7 @@ void DeletcRecallXml() {
QDir dir(cfg_dir);
if (!dir.exists()) {
qDebug() << "folder does not exist!";
DIY_ERRORLOG("process","前置的%s%d号进程 删除旧的补招文件失败,补招文件路径/FeProject/etc/recall/不存在", get_front_msg_from_subdir(), g_front_seg_index);
return;
}
QStringList filter(file_name);
@@ -2728,6 +2743,7 @@ void DeletcRecallXml() {
if (fileInfo.lastModified() < saveDaysAgo) {
QFile::remove(fileInfo.absoluteFilePath());
DIY_INFOLOG("process","前置的%s%d号进程 删除超过两天的补招文件", get_front_msg_from_subdir(), g_front_seg_index);
}
}
@@ -2743,6 +2759,9 @@ void CreateRecallXml()
if (g_StatisticLackList.size() > 0)
{
printf("insert ID_CJournalRecall_Map\n");
DIY_INFOLOG("process","前置的%s%d号进程 开始写入补招文件", get_front_msg_from_subdir(), g_front_seg_index);
QMap<QString, QList<CJournalRecall> > ID_CJournalRecall_Map;
list<CJournalRecall>::iterator sl = g_StatisticLackList.begin();
@@ -2773,6 +2792,9 @@ void CreateRecallXml()
QFile file(strRecallPath.c_str());
if (!file.open(QIODevice::WriteOnly | QIODevice::Truncate)) {
printf("补招查询完成,打开%s失败,无法写入线路补招配置!\n", qstrRecallPath.toAscii().data());
DIY_ERRORLOG("process","前置的%s%d号进程 无法将补招文件写入补招文件路径/FeProject/etc/recall/", get_front_msg_from_subdir(), g_front_seg_index);
QMap<QString, QList<CJournalRecall> >().swap(ID_CJournalRecall_Map);
return;
}
@@ -3869,13 +3891,13 @@ int parse_device_cfg_web()
//判断是否相等
if(max_process_num != max_index){
if(max_process_num > 0 && max_process_num < 10){
DIY_WARNLOG("process","【WARN】前置比对台账获取的进程数:%s和本地配置的进程数:%s,不匹配,按照台账进程数重置前置的进程数量",max_process_num,max_index);
DIY_WARNLOG("process","【WARN】前置比对台账获取的进程数:%d和本地配置的进程数:%d,不匹配,按照台账进程数重置前置的进程数量",max_process_num,max_index);
// 调用执行脚本函数
close_listening_socket();
execute_bash("reset", max_process_num, "all");
}
else{
DIY_ERRORLOG("process","【ERROR】前置从台账获取的进程数:%s不符合范围1~9,按照本地配置进程数启动进程",max_process_num);
DIY_ERRORLOG("process","【ERROR】前置从台账获取的进程数:%d不符合范围1~9,按照本地配置进程数启动进程",max_process_num);
}
}
}
@@ -4179,6 +4201,8 @@ int parse_device_cfg_web()
{
isdelta_flag = 1; //存在一个监测点为角型接线则这个前置就要启动第二个配置列表
cout << "monitor_id" << monitor_id << "v_wiring_type:" << line_info.v_wiring_type << "is delta wiring:" << isdelta_flag << endl;
DIY_WARNLOG("process","前置连接的监测点 %s 是角形接线,对应终端为%s 终端类型是%s",line_info.mp_id,ied_usr->terminal_id,ied_usr->dev_type);
}
strcpy(line_info.monitor_status, monitor_status);
@@ -5963,7 +5987,7 @@ bool shouldSkipTerminal(const char* terminal_id) {
return false;
}
void rocketmq_test_300(int mpnum,int front_index) {
void rocketmq_test_300(int mpnum,int front_index,int type) {
Ckafka_data_t data;
data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC);
data.mp_id = "0";
@@ -5994,69 +6018,115 @@ void rocketmq_test_300(int mpnum,int front_index) {
ied_usr_t* ied_usr;
// 循环发送 300 条消息
for (int i = 0; (total_messages != 0 && g_front_seg_index == 1 && g_node_id == 100) && i < g_node->n_clients; ++i) {
if(type == 0){
std::cout << " use ledger send msg " << std::endl;
for (int i = 0; (total_messages != 0 && g_front_seg_index == 1 && g_node_id == 100) && i < g_node->n_clients; ++i) {
ied = (ied_t*)g_node->clients[i];
if(ied != NULL){
ied_usr = (ied_usr_t*)ied->usr_ext;
ied = (ied_t*)g_node->clients[i];
if(ied != NULL){
ied_usr = (ied_usr_t*)ied->usr_ext;
//跳过正常的终端
if (shouldSkipTerminal(ied_usr->terminal_id)) {
std::cout << ied_usr->terminal_id << " use true message " << std::endl;
continue;
}
//跳过正常的终端
if (shouldSkipTerminal(ied_usr->terminal_id)) {
std::cout << ied_usr->terminal_id << " use true message " << std::endl;
continue;
}
for (int j = 0; j < 10 && ied_usr->LD_info[j].mp_id[0] != '\0'; j++){
// 修改 Monitor 值
char monitor_id[256] = {};
strncpy(monitor_id, ied_usr->LD_info[j].mp_id, sizeof(monitor_id) - 1);
monitor_id[sizeof(monitor_id) - 1] = '\0';
for (int j = 0; j < 10 && ied_usr->LD_info[j].mp_id[0] != '\0'; j++){
// 修改 Monitor 值
char monitor_id[256] = {};
strncpy(monitor_id, ied_usr->LD_info[j].mp_id, sizeof(monitor_id) - 1);
monitor_id[sizeof(monitor_id) - 1] = '\0';
data.mp_id = QString(monitor_id);
data.mp_id = QString(monitor_id);
data.monitor_id = i + j;
std::string modified_time = my_to_string(current_time_ms); // 时间转换为整数类型Unix时间戳
data.monitor_id = i + j;
std::string modified_time = my_to_string(current_time_ms); // 时间转换为整数类型Unix时间戳
// 替换消息中的 Monitor 和 TIME 字段(只匹配字段名,不匹配具体数值)
std::string modified_strText = base_strText;
// 替换消息中的 Monitor 和 TIME 字段(只匹配字段名,不匹配具体数值)
std::string modified_strText = base_strText;
// 替换 Monitor 字段
size_t monitor_pos = modified_strText.find("\"Monitor\"");
if (monitor_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", monitor_pos);
size_t quote_pos = modified_strText.find("\"", colon_pos);
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id.toStdString());
}
}
// 替换 Monitor 字段
size_t monitor_pos = modified_strText.find("\"Monitor\"");
if (monitor_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", monitor_pos);
size_t quote_pos = modified_strText.find("\"", colon_pos);
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id.toStdString());
}
}
// 替换 TIME 字段
size_t time_pos = modified_strText.find("\"TIME\"");
if (time_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", time_pos);
size_t quote_pos = colon_pos;
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
}
}
// 替换 TIME 字段
size_t time_pos = modified_strText.find("\"TIME\"");
if (time_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", time_pos);
size_t quote_pos = colon_pos;
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
}
}
// 更新数据
data.strText = QString::fromStdString(modified_strText);
// 更新数据
data.strText = QString::fromStdString(modified_strText);
// 发送数据
my_rocketmq_send(data);
// 发送数据
my_rocketmq_send(data);
// 输出调试信息
std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_id << " and TIME " << modified_time << std::endl;
// 输出调试信息
std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_id << " and TIME " << modified_time << std::endl;
// 等待下一条消息的发送固定为1分钟
//QThread::sleep(60); // 每次发送间隔1分钟
}
}
}
}
}
}
}
else{
std::cout << " use monitor + number send msg " << std::endl;
for (int i = 0; (total_messages != 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) {
// 修改 Monitor 值
char monitor_id[256] = {};
snprintf(monitor_id, sizeof(monitor_id), "testmonitor%05d", i);
monitor_id[sizeof(monitor_id) - 1] = '\0';
data.mp_id = QString(monitor_id);
data.monitor_id = i;
std::string modified_time = my_to_string(current_time_ms); // 时间转换为整数类型Unix时间戳
// 替换消息中的 Monitor 和 TIME 字段(只匹配字段名,不匹配具体数值)
std::string modified_strText = base_strText;
// 替换 Monitor 字段
size_t monitor_pos = modified_strText.find("\"Monitor\"");
if (monitor_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", monitor_pos);
size_t quote_pos = modified_strText.find("\"", colon_pos);
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id.toStdString());
}
}
// 替换 TIME 字段
size_t time_pos = modified_strText.find("\"TIME\"");
if (time_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", time_pos);
size_t quote_pos = colon_pos;
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
}
}
// 更新数据
data.strText = QString::fromStdString(modified_strText);
// 发送数据
my_rocketmq_send(data);
// 输出调试信息
std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_id << " and TIME " << modified_time << std::endl;
}
}
std::cout << "Finished sending " << total_messages << " messages." << std::endl;
}