diff --git a/cfg_parse/SimpleProducer.cpp b/cfg_parse/SimpleProducer.cpp index f9d2971..3f4e0b9 100644 --- a/cfg_parse/SimpleProducer.cpp +++ b/cfg_parse/SimpleProducer.cpp @@ -44,6 +44,8 @@ extern std::string G_ROCKETMQ_TOPIC;//topie extern std::string G_ROCKETMQ_TAG;//tag extern std::string G_ROCKETMQ_KEY;//key +extern std::string FRONT_INST; + #ifdef __cplusplus extern "C" { #endif @@ -283,14 +285,14 @@ RocketMQConsumer::~RocketMQConsumer() // 在 RocketMQConsumer 类中新增函数用来设置消费模式 void RocketMQConsumer::setConsumerMessageModel(const std::string& topic) { - if (topic == G_MQCONSUMER_TOPIC_SET) { + /*if (topic == G_MQCONSUMER_TOPIC_SET) { // 设置为普通消费模式 if (SetPushConsumerMessageModel(consumer_, CLUSTERING) != 0) { std::cout << "Error setting message model to CLUSTERING for topic: " << topic << std::endl; } else { std::cout << "Set consumer to CLUSTERING for topic: " << topic << std::endl; } - } else { + } else*/ { // 默认设置为广播消费模式 if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) { std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl; @@ -649,7 +651,7 @@ void rocketmq_test_rt() { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RT); + data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT); std::ifstream file("rt.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -663,7 +665,7 @@ void rocketmq_test_ud()//用来测试台账更新 { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_UD); + data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD); std::ifstream file("ud.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -677,7 +679,7 @@ void rocketmq_test_set()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET); + data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); std::ifstream file("set.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -691,7 +693,7 @@ void rocketmq_test_only()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET); + data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); std::ifstream file("set_debug.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -706,7 +708,7 @@ void rocketmq_test_rc() { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RC); + data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC); std::ifstream file("rc.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -721,7 +723,7 @@ void rocketmq_test_log() { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG); + data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG); std::ifstream file("log_test.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 8e2c209..38fed94 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -221,7 +221,7 @@ char* UDS_DELETE_URL; int FILE_FLAG; int SEND_FLAG; -int FRONT_INST; +std::string FRONT_INST;//lnk20250512改为string char* FRONT_IP; int CITY_FLAG; @@ -331,10 +331,16 @@ extern pthread_mutex_t mtx; /*lnk 2024-10-21 */ std::string intToString(int number); + +//lnk20250512 +void send_heartbeat_to_kafka(const std::string& status); ////////////////////////////////////////////////////////////////////////// extern int server_socket; //Web Socket服务端实例 extern unsigned int g_node_id; //前置程序类型(100-500) + +extern QMutex kafka_data_list_mutex; //Kafka发送数据锁 +extern QList kafka_data_list; //kafka发送数据链表 //WW 2023-08-20 end //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -401,8 +407,13 @@ void init_config() { qDebug() << "Read SEND_FLAG:" << SEND_FLAG << endl; FILE_FLAG = settings.value("Flag/FileFlag", 0).toInt(); qDebug() << "Read FILE_FLAG:" << FILE_FLAG << endl; - FRONT_INST = settings.value("Flag/FrontInst", 0).toInt(); - qDebug() << "Read FRONT_INST:" << FRONT_INST << endl; + + //FRONT_INST = settings.value("Flag/FrontInst", 0).toInt(); + //qDebug() << "Read FRONT_INST:" << FRONT_INST << endl; + ba = settings.value("Flag/FrontInst", "").toString().toLatin1(); + FRONT_INST = strdup(ba.data()); + std::cout << "Read FRONT_INST:" << FRONT_INST << endl; + ba = settings.value("Flag/FrontIP", "").toString().toLatin1(); FRONT_IP = strdup(ba.data()); qDebug() << "Read FRONT_IP:" << FRONT_IP << endl; @@ -686,6 +697,12 @@ void init_config() { std::cout << "Read G_CONNECT_TOPIC:" << G_CONNECT_TOPIC << std::endl; std::cout << "Read G_CONNECT_TAG:" << G_CONNECT_TAG << std::endl; std::cout << "Read G_CONNECT_KEY:" << G_CONNECT_KEY << std::endl; + std::cout << "Read Heart_Beat_Topic:" << Heart_Beat_Topic << std::endl; + std::cout << "Read Heart_Beat_Tag:" << Heart_Beat_Tag << std::endl; + std::cout << "Read Heart_Beat_Key:" << Heart_Beat_Key << std::endl; + std::cout << "Read Topic_Reply_Topic:" << Topic_Reply_Topic << std::endl; + std::cout << "Read Topic_Reply_Tag:" << Topic_Reply_Tag << std::endl; + std::cout << "Read Topic_Reply_Key:" << Topic_Reply_Key << std::endl; //消费者相关打印 std::cout << "Read G_ROCKETMQ_CONSUMER:" << G_ROCKETMQ_CONSUMER << std::endl; std::cout << "Read G_MQCONSUMER_IPPORT:" << G_MQCONSUMER_IPPORT << std::endl; @@ -3525,28 +3542,6 @@ int terminal_ledger_web(QMap* terminal_dev_map, return 1; } - //后续修改为根据index来获取对应的台账,将不再均分,获取所有台账并判断进程号 -#if 0 - int base_size = 0; - int remainder = 0; - int start_index = 0; - int end_index = data_size; - - if(1 == MULTIPLE_NODE_FLAG){ - // 计算每份的大小 - base_size = data_size / num; - remainder = data_size % num; - - // 计算当前份的起始和结束索引 - start_index = index * base_size + (index < remainder ? index : remainder); - end_index = start_index + base_size + (index < remainder ? 1 : 0); - - // 确保结束索引不超过数组大小 - if (end_index > data_size) { - end_index = data_size; - } - } -#endif int start_index = 0; int end_index = data_size; @@ -4287,7 +4282,7 @@ int parse_model_cfg_web() } } ////////////////////////////////////////////////////////////icd模型重构函数lnk20250116 -char* parse_model_cfg_web_one(ied_t* ied) +char* parse_model_cfg_web_one(ied_t* ied, char* out_model) { std::vector codes;//入参集合 QMap icd_model_map; @@ -4323,19 +4318,28 @@ char* parse_model_cfg_web_one(ied_t* ied) if (value != nullptr) { // 找到容器中对应名称并取值 strncpy(model_id, value->model_id, sizeof(model_id) - 1); + model_id[sizeof(model_id) - 1] = '\0'; + strncpy(tmnl_type, value->tmnl_type, sizeof(tmnl_type) - 1); strncpy(file_path, value->file_path, sizeof(file_path) - 1); strncpy(file_name, value->file_name, sizeof(file_name) - 1); - std::cout << "model_id" << model_id << std::endl; - std::cout << "tmnl_type" << tmnl_type << std::endl; - std::cout << "filepath" << file_path << std::endl; - std::cout << "filename" << file_name << std::endl; + std::cout << "model_id:" << model_id << std::endl; + std::cout << "tmnl_type:" << tmnl_type << std::endl; + std::cout << "filepath:" << file_path << std::endl; + std::cout << "filename:" << file_name << std::endl; Set_xml_databaseinfo(model_id, tmnl_type, file_path, file_name, timestamp.year, timestamp.month, timestamp.day, timestamp.hour, timestamp.minute, timestamp.second); - } + + if (out_model != NULL) { + strncpy(out_model, model_id, 64); + out_model[63] = '\0'; + } + + return model_id; + } } - return model_id; + } catch (otl_exception& e) { @@ -4377,8 +4381,11 @@ void OnTimerThread::run() int pgmin = 0; int mp_num_hour = 0; - //添加入参容器 - std::vector codes; + //时间计数用 + int counter = 0; + + //初始化发送一次心跳 + send_heartbeat_to_kafka("1"); while (1) { @@ -4408,6 +4415,13 @@ void OnTimerThread::run() //添加日志开关控制lnk20250508 update_log_entries_countdown(); + //添加进程心跳 + if (counter >= 30) { + send_heartbeat_to_kafka("1"); // 每30秒发送一次心跳 + counter = 0; + } + counter++; + msleep(1000); } printf(">>>OnTimerThread::run() is end!!!\n"); @@ -4611,37 +4625,7 @@ bool threadmsghttp(int fun); #ifdef __cplusplus } #endif -#if 0 -void RecallJsonResponse(int result) { - char* ptr=NULL; - - // 创建 JSON 对象 - cJSON* json_response = cJSON_CreateObject(); - // 添加字段 - if(result == 000000){ - cJSON_AddStringToObject(json_response, "code", "A0000"); - cJSON_AddStringToObject(json_response, "msg", "数据补招执行成功"); - } - else{ - cJSON_AddStringToObject(json_response, "code", "A0002"); - cJSON_AddStringToObject(json_response, "msg", "数据补招执行失败"); - } - - cJSON_AddNullToObject(json_response, "data"); // 设置为 null - - // 将 JSON 对象转换为字符串 - char* json_string = cJSON_Print(json_response); - std::cout << json_string << std::endl; - - //发送回复给对方,只发一次 - SendJsonAPI_web("调用方", "", json_string ,&ptr); - - // 清理 - cJSON_Delete(json_response); - free(json_string); // 释放打印字符串的内存 -} -#endif void WebhttpThread::run() { diff --git a/cfg_parse/log4.cpp b/cfg_parse/log4.cpp index 3da00b2..3bb8d63 100644 --- a/cfg_parse/log4.cpp +++ b/cfg_parse/log4.cpp @@ -166,6 +166,9 @@ protected: public: SendAppender() {} + virtual ~SendAppender() { + destructorImpl(); // 重要!释放 log4cplus 基类资源 + } }; //用来控制日志上送的结构 @@ -452,6 +455,10 @@ extern "C" { void log_info (const char* key, const char* msg) { log4_log_with_level(key, msg, 1); } void log_warn (const char* key, const char* msg) { log4_log_with_level(key, msg, 2); } void log_error(const char* key, const char* msg) { log4_log_with_level(key, msg, 3); } + + void send_reply_to_kafka_c(const char* guid, const char* step, const char* result) { + send_reply_to_kafka(std::string(guid), std::string(step), std::string(result)); + } } diff --git a/json/save2json.cpp b/json/save2json.cpp index 5680f95..49d10c6 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -73,6 +73,8 @@ extern "C" { extern int INITFLAG; +extern std::string FRONT_INST; + extern QMutex kafka_data_list_mutex; extern QList kafka_data_list; @@ -525,87 +527,6 @@ void KafkaSendThread::run() printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); } - - /*//lnk20250225添加日志上送 - Ckafka_data_t log_send; - log_send.strTopic = QString::fromStdString(G_LOG_TOPIC); - bool log_gotten; - log_gotten = false; - - if(!showinshellflag){ - if (debugOutputEnabled) { - // 如果 normalOutputEnabled 和 warnOutputEnabled 都为 0,且 errorOutputEnabled 为 1,取 errorList 输出 - // 处理 errorList 的输出 - pthread_mutex_lock(&debugListMutex); - if (!debugList.empty()) { - log_gotten = true; - log_send.strText = QString::fromStdString(debugList.front());//请确保list正确 - // 检查是否为空字符串(去掉空白字符后是否为空) - if (log_send.strText.trimmed().isEmpty()) { - debugList.pop_front(); // 直接丢弃这条日志 - log_gotten = false; // 标记没有获取到有效日志 - } else { - debugList.pop_front(); // 只有非空白字符串才会真正返回 - } - } - pthread_mutex_unlock(&debugListMutex); - } - else if (normalOutputEnabled) { - // 如果 normalOutputEnabled 为 1,优先从 normalList 获取输出 - // 处理 normalList 的输出 - pthread_mutex_lock(&normalListMutex); - if (!normalList.empty()) { - - log_gotten = true; - log_send.strText = QString::fromStdString(normalList.front()); - // 检查是否为空字符串(去掉空白字符后是否为空) - if (log_send.strText.trimmed().isEmpty()) { - normalList.pop_front(); // 直接丢弃这条日志 - log_gotten = false; // 标记没有获取到有效日志 - } else { - normalList.pop_front(); // 只有非空白字符串才会真正返回 - } - } - pthread_mutex_unlock(&normalListMutex); - } else if (warnOutputEnabled) { - // 如果 normalOutputEnabled 为 0,且 warnOutputEnabled 为 1,优先从 warnList 获取输出 - // 处理 warnList 的输出 - pthread_mutex_lock(&warnListMutex); - if (!warnList.empty()) { - log_gotten = true; - log_send.strText = QString::fromStdString(warnList.front()); - // 检查是否为空字符串(去掉空白字符后是否为空) - if (log_send.strText.trimmed().isEmpty()) { - warnList.pop_front(); // 直接丢弃这条日志 - log_gotten = false; // 标记没有获取到有效日志 - } else { - warnList.pop_front(); // 只有非空白字符串才会真正返回 - } - } - pthread_mutex_unlock(&warnListMutex); - } else if (errorOutputEnabled) { - // 如果 normalOutputEnabled 和 warnOutputEnabled 都为 0,且 errorOutputEnabled 为 1,取 errorList 输出 - // 处理 errorList 的输出 - pthread_mutex_lock(&errorListMutex); - if (!errorList.empty()) { - log_gotten = true; - log_send.strText = QString::fromStdString(errorList.front()); - // 检查是否为空字符串(去掉空白字符后是否为空) - if (log_send.strText.trimmed().isEmpty()) { - errorList.pop_front(); // 直接丢弃这条日志 - log_gotten = false; // 标记没有获取到有效日志 - } else { - errorList.pop_front(); // 只有非空白字符串才会真正返回 - } - } - pthread_mutex_unlock(&errorListMutex); - } - } - - if (log_gotten) { - static uint32_t count = 0; - my_rocketmq_send(log_send); - }*/ QThread::msleep(10); // 避免 CPU 空转lnk20250326 @@ -649,15 +570,17 @@ std::string extractDataJson(const char* inputJson) { return ""; } - /*//添加guid + //添加guid // 提取 "guid" 部分 cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid"); - if (guidstr == NULL || data->type != cJSON_String) { + if (guidstr == NULL || guidstr->type != cJSON_String) { std::cerr << "'guid' is missing or is not an array" << std::endl; cJSON_Delete(root); return ""; } - //guid回复*/ + //guid回复 + std::string guid = guidstr->valuestring; + send_reply_to_kafka(guid,"1","收到补招指令"); // 提取 "data" 部分 cJSON* data = cJSON_GetObjectItem(messageBody, "data"); @@ -737,8 +660,9 @@ bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::st cJSON* limitItem = cJSON_GetObjectItem(messageBody, "limit"); //添加guid - //cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid"); - //if(guidstr)std::string guid = guidstr->valuestring; + std::string guid; + cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid"); + if(guidstr)guid = guidstr->valuestring; if (devSeriesItem && lineItem && realDataItem && soeDataItem && limitItem) { devSeries = devSeriesItem->valuestring; @@ -748,7 +672,8 @@ bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::st limit = limitItem->valueint; //回复消息 - //guid + //执行结果直接看实时数据,不需要再回复,1是收到消息 + send_reply_to_kafka(guid,"1","收到三秒数据指令"); } else { std::cerr << "Missing expected fields in JSON message." << std::endl; @@ -1031,8 +956,7 @@ void parse_set(const std::string& json_str) { std::string frontType = front->valuestring; - //进程号为0的进程处理所有控制消息 - if (index_value != g_front_seg_index && g_front_seg_index !=0) { + if (index_value != g_front_seg_index) { std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl; cJSON_Delete(root); return; @@ -1068,6 +992,7 @@ void parse_set(const std::string& json_str) { //脚本在3秒后执行 //回复消息 + send_reply_to_kafka(guid,"1","收到重置进程指令,重启所有进程!"); //上送日志 @@ -1081,6 +1006,11 @@ void parse_set(const std::string& json_str) { //等待一会后退出进程 MVL_LOG_ACSE0("MYLOG: recive delete msg, so exit to restart "); + //回复消息 + send_reply_to_kafka(guid,"1","收到删除进程指令,这个进程将会重启 "); + + //上送日志 + apr_sleep(apr_time_from_sec(10)); exit(-1039); } @@ -1444,7 +1374,6 @@ void parse_log(const std::string& json_str) { std::string frontType = frontTypestr->valuestring; - //进程号为0的进程处理所有台账更新消息 if (processNo != g_front_seg_index) { std::cout << "msg index:"<< processNo <<"doesnt match self index:" << g_front_seg_index << std::endl; cJSON_Delete(root); @@ -1457,10 +1386,13 @@ void parse_log(const std::string& json_str) { return; } - //进程号匹配上 + //进程号和匹配上 std::cout << "msg index:"<< processNo <<" self index:" << g_front_seg_index << std::endl; std::cout << "msg frontType:"<< frontType <<" self frontType:" << subdir << std::endl; + //回复消息 + send_reply_to_kafka(guid,"1","收到实时日志指令"); + if (code_str == "set_log") { //校验数据 if((level == "terminal" || level == "measurepoint") && @@ -1560,7 +1492,8 @@ void parse_control(const std::string& json_str, const std::string& output_dir) { std::cout << "msg index:"<< process_No <<" self index:" << g_front_seg_index << std::endl; //匹配后响应收到台账更新消息 - //guid + //除了回复收到消息,执行结束后还要回复结果 + send_reply_to_kafka(guid,"1","收到台账更新指令"); if (code_str == "add_terminal" || code_str == "ledger_modify") { @@ -2032,24 +1965,22 @@ void mqconsumerThread::run() std::vector subscriptions; // 初始化消费者1 //lnk20241230只有实时进程会订阅实时topic,不订阅实时topic的进程无法触发实时数据 if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID){ - subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata)); + subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata)); } // 初始化消费者2 //所有进程都会订阅台账更新topic,不同功能进程的台账不能互相影响 - subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate)); + subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate)); // 初始化消费者3 //lnk20241230只有补招进程会订阅补招topic,不订阅补招topic的进程无法触发补招数据 if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID){ - subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall)); + subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall)); } - // 初始化消费者4 //lnk20250108只有稳态进程1会订阅控制topic,不订阅控制topic的进程无法触发进程重置 - if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){ - subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset)); - } + // 初始化消费者4 //lnk20250108只有稳态进程1会控制reset, + subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset)); // 初始化消费者5 //所有进程都会订阅日志上送topic,不同功能进程的日志上送不能互相影响 - subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog)); + subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog)); try { rocketmq_consumer_receive(consumerName, nameServer, subscriptions); diff --git a/log4cplus/log4.h b/log4cplus/log4.h index 8e1b23e..634b69c 100644 --- a/log4cplus/log4.h +++ b/log4cplus/log4.h @@ -50,8 +50,10 @@ struct DebugSwitch { extern std::map logger_map; extern DebugSwitch g_debug_switch; +extern void send_reply_to_kafka(const std::string& guid, const std::string& step, const std::string& result); +std::string get_front_type_from_subdir(); log4cplus::Logger init_logger(const std::string& full_name, const std::string& file_dir, const std::string& base_file); @@ -73,6 +75,8 @@ void log_debug(const char* key, const char* msg); void log_info(const char* key, const char* msg); void log_warn(const char* key, const char* msg); void log_error(const char* key, const char* msg); + +void send_reply_to_kafka_c(const char* guid, const char* step, const char* result); #ifdef __cplusplus } #endif diff --git a/mms/main.c b/mms/main.c index 99105c6..059ad3d 100644 --- a/mms/main.c +++ b/mms/main.c @@ -315,14 +315,14 @@ int main(int argc, const char **argv) } //lnk20241211 添加测试开关 - pthread_mutex_lock(&mtx); + /*pthread_mutex_lock(&mtx); if (!G_TEST_FLAG && g_front_num_count >= 30 && g_onlyIP[0] == 0 && g_node->n_clients>10) {//30分钟连接数量过低且不是单连且台账大于十个终端 MVL_LOG_ACSE0("MYLOG: g_front_num_count>=20, so exit to restart "); apr_sleep(apr_time_from_sec(10)); exit(-1039); } - pthread_mutex_unlock(&mtx); + pthread_mutex_unlock(&mtx);*/ } diff --git a/mms/mms_process.c b/mms/mms_process.c index cb02f85..63de4ea 100644 --- a/mms/mms_process.c +++ b/mms/mms_process.c @@ -671,10 +671,17 @@ void check_3s_config() int isValidModelId(const char* model_id) { size_t i; - if (model_id == NULL) return 0; // NULL 无效 + if (model_id == NULL) + { + printf("!!!model_id null!!!\n"); + return 0; + } // NULL 无效 size_t len = strlen(model_id); - if (len < 4) return 0; // 长度 < 4 无效 + if (len < 4) { + printf("!!!model_id length < 4!!!\n"); + return 0; // 长度 < 4 无效 + } // 检查是否全是空格 for (i = 0; i < len; i++) { @@ -682,6 +689,7 @@ int isValidModelId(const char* model_id) { return 1; // 只要包含非空格字符,就是合法的 } } + printf("!!!model_id only space!!!\n"); return 0; // 仅包含空格,无效 } @@ -774,6 +782,9 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) //添加mq响应台账添加失败:台账挂满 //update[i].guid + char msg[256]; + sprintf(msg, "终端 id: %s 台账更新失败, 这个进程的台账空间已满,达到了配置台账数量的最大值", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); return; } @@ -798,14 +809,24 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) ied_usr = (ied_usr_t*)apr_pcalloc(g_init_pool, sizeof(ied_usr_t)); ied->usr_ext = ied_usr;//内存挂到ied上 - if (ied_usr == NULL) - return APR_ENOMEM; + if (ied_usr == NULL){ + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账更新失败,没有找到台账的终端空间", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); + return ; + } + ied_usr->last_call_wavelist_time = sGetMsTime() + g_pt61850app->giTime * 1000;//从FeProject/子功能目录/etc/pt61850netd_pqfe.xml中读取的总查询时间 ied_usr->LD_info = (LD_info_t*)apr_pcalloc(g_init_pool, MAX_CPUNO * sizeof(LD_info_t));//内存挂到ied上 - if (ied_usr->LD_info == NULL) - return APR_ENOMEM; + if (ied_usr->LD_info == NULL){ + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账更新失败,没有找到台账的监测点空间", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); + return ; + } + ied_usr->dev_flag = ENABLE;//终端有效 @@ -821,22 +842,27 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) int ret = update_one_terminal_ledger(update,i,ied,terminal_index,ied_take); if(ret){ printf("ledger can not be update!!!!!quit process!!!!!\n"); - return 0; + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账更新失败,无法写入台账", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); } //3-写入台账内容/////////////////////////////////// //4-配置映射文件////////////////////////////// char model[64]; // 获取模型ID,检查是否返回 NULL - char* model_id = parse_model_cfg_web_one(ied);//存储在/FeProject/dat/ + parse_model_cfg_web_one(ied,model);//存储在/FeProject/dat/ - if (isValidModelId(model_id)) { //lnk20250313防止拿不到映射文件 + if (isValidModelId(model)) { //lnk20250313防止拿不到映射文件 // 安全拷贝字符串到 model 数组 - strncpy(model, model_id, sizeof(model) - 1); + strncpy(model, model, sizeof(model) - 1); model[sizeof(model) - 1] = '\0'; // 确保以 null 结尾 printf("ledger Model ID: %s\n", model); } else { printf("ledger No model ID found.quit\n"); + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账更新失败,没有找到装置型号", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); return ; } char full_path[128]; @@ -865,6 +891,9 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) //8响应添加成功 //update[i].guid + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账添加成功", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); } } @@ -908,22 +937,31 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) int ret = update_one_terminal_ledger(update,i,ied,ied_usr->dev_idx,1);//1:更新已有的ied if(ret){ printf("ledger can not be update!!!!!quit process!!!!!\n"); - return 0; + + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账更新失败,台账无法写入", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); + return ; } //3-写入台账内容//////////////////////////////////////////// //4-配置映射文件/////////////////////////////////////////// char model[64] = {0}; // 获取模型ID,检查是否返回 NULL - char* model_id = parse_model_cfg_web_one(ied);//存储在/FeProject/dat/ + parse_model_cfg_web_one(ied,model);//存储在/FeProject/dat/ - if (model_id != NULL) { + if (isValidModelId(model)) { // 安全拷贝字符串到 model 数组 - strncpy(model, model_id, sizeof(model) - 1); + strncpy(model, model, sizeof(model) - 1); model[sizeof(model) - 1] = '\0'; // 确保以 null 结尾 printf("ledger Model ID: %s\n", model); } else { printf("ledger No model ID found.\n"); + + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账更新失败,没有找到装置型号", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); + return ; } char full_path[128]; snprintf(full_path, sizeof(full_path), "/FeProject/dat/%s.xml", model); // 拼接路径 @@ -949,6 +987,9 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) //8响应添加成功 //update[i].guid + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账修改成功", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); } } @@ -957,6 +998,9 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) //添加mq响应台账添加失败:台账找不到 //update[i].guid + char msg[256]; + sprintf(msg, "终端 id: %s 台账修改失败, 无法找到这个终端", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); } } ///////////////////////////////////////////////////////////////////////////////delete @@ -1032,7 +1076,10 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) //更新数据////////////////////////////////////////////////////////////////////// //7响应添加成功 - //update[i].guid + //update[i].guid + char msg[256]; + snprintf(msg, sizeof(msg), "终端 id: %s 台账删除成功", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); } } @@ -1041,6 +1088,10 @@ void process_ledger_update(trigger_update_xml_t *ledger_update_xml) //添加mq响应台账添加失败:台账找不到 //update[i].guid + + char msg[256]; + sprintf(msg, "终端 id: %s 台账删除失败, 无法找到这个终端", update[i].terminal_id); + send_reply_to_kafka_c(update[i].guid, "2", msg); } } diff --git a/mms/rdb_client.h b/mms/rdb_client.h index 655e226..73a5048 100644 --- a/mms/rdb_client.h +++ b/mms/rdb_client.h @@ -472,7 +472,7 @@ bool isCharPtrEmpty(const char* str); int parse_ledger_update_xml(trigger_update_xml_t* trigger_update_xml); int update_one_terminal_ledger(terminal* update, int i,ied_t* ied,int terminal_index,int ied_take); void print_trigger_update_xml(const trigger_update_xml_t* trigger_update); -char* parse_model_cfg_web_one(ied_t* ied); +char* parse_model_cfg_web_one(ied_t* ied,char* out_model); void Set_xml_nodeinfo_one(char* dev_type); void create_ledger_log(trigger_update_xml_t* ledger_update_xml); ied_t* find_ied_unused(); diff --git a/rocketmq/SimpleProducer.h b/rocketmq/SimpleProducer.h index 22a066f..8f3b11e 100644 --- a/rocketmq/SimpleProducer.h +++ b/rocketmq/SimpleProducer.h @@ -8,7 +8,7 @@ #include "../rocketmq/CPushConsumer.h" #include - +#include /*添加测试函数lnk10-10*/ void producer_send0(); @@ -42,7 +42,7 @@ struct Subscription { MessageCallBack callback; Subscription(const std::string& t, const std::string& tg, MessageCallBack cb) - : topic(t), tag(tg), callback(cb) {} + : topic(t), tag(tg), callback(cb) {std::cout << "Subscription topic: " << topic << std::endl;} }; void rocketmq_consumer_receive(