diff --git a/cfg_parse/SimpleProducer.cpp b/cfg_parse/SimpleProducer.cpp index 5a29d33..29f4605 100644 --- a/cfg_parse/SimpleProducer.cpp +++ b/cfg_parse/SimpleProducer.cpp @@ -45,11 +45,15 @@ using namespace std; +static std::once_flag g_consumer_once; +static std::once_flag g_producer_once; + extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port -extern std::string G_ROCKETMQ_TOPIC;//topie -extern std::string G_ROCKETMQ_TAG;//tag -extern std::string G_ROCKETMQ_KEY;//key + +extern std::string G_ROCKETMQ_TOPIC_TEST;//topie +extern std::string G_ROCKETMQ_TAG_TEST;//tag +extern std::string G_ROCKETMQ_KEY_TEST;//key extern std::string G_MQCONSUMER_TOPIC_LOG; extern std::string G_MQCONSUMER_TOPIC_SET; @@ -57,6 +61,8 @@ extern std::string G_MQCONSUMER_TOPIC_RC; extern std::string G_MQCONSUMER_TOPIC_UD; extern std::string G_MQCONSUMER_TOPIC_RT; +extern std::string G_MQCONSUMER_TOPIC_TEST; + extern std::string FRONT_INST; extern bool DEBUGOPEN; @@ -110,8 +116,8 @@ public: RocketMQConsumer(const std::string& consumerName, const std::string& nameServer); // 禁用拷贝和赋值 - //RocketMQConsumer(const RocketMQConsumer&) {} - RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; } + RocketMQConsumer(const RocketMQConsumer&) = delete; + RocketMQConsumer& operator=(const RocketMQConsumer&) = delete; // 订阅主题和标签,并注册回调 void subscribe(const std::string& topic, @@ -227,24 +233,25 @@ RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup, G_MQCONSUMER_CHANNEL ); + // 限制消费线程池,防止 ConsumeTP 爆炸 + consumer_.setConsumeThreadCount(4); + listener_ = new InternalListener(this); } // 启动消费者 void RocketMQConsumer::start() { - /*if (StartPushConsumer(consumer_) != 0) { - pthread_mutex_lock(&g_consumerMapMutex); - g_consumerMap.erase(consumer_); - pthread_mutex_unlock(&g_consumerMapMutex); - DestroyPushConsumer(consumer_); - throw std::runtime_error("Failed to start push consumer."); + static bool started = false; + if (started) { + std::cout << "Consumer already started" << std::endl; + return; } - else{ - std::cout << "RocketMQ Consumer started." << std::endl; - }*/ + consumer_.registerMessageListener(listener_); consumer_.start(); + + started = true; } void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback) @@ -361,6 +368,8 @@ RocketMQConsumer::~RocketMQConsumer() } catch (...) { } + sleep(1); // 等内部线程退出 + delete listener_; listener_ = NULL; @@ -434,16 +443,13 @@ void rocketmq_consumer_receive( const std::string& nameServer, const std::vector& subscriptions) // 接收多个订阅 { - if (g_consumer == NULL) { + std::call_once(g_consumer_once, [&](){ try { - //InitializeConsumer(consumerName, nameServer, topic, tag, callback);//初始化后,mq库内部来完成消息的获取 - InitializeConsumer(consumerName, nameServer, subscriptions); // 初始化后,MQ库内部开始获取消息 - } - catch (...) { + InitializeConsumer(consumerName, nameServer, subscriptions); + } catch (...) { std::cerr << "Cannot consume message because consumer initialization failed." << std::endl; - return; } - } + }); } ///////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -848,14 +854,9 @@ void rocketmq_producer_send(const std::string& body, const std::string& tags, const std::string& keys) { - if (g_producer == NULL) { - try { - InitializeProducer(); - } catch (...) { - std::cerr << "Cannot send message because producer initialization failed." << std::endl; - return; - } - } + std::call_once(g_producer_once, [&](){ + InitializeProducer(); + }); try { g_producer->sendMessage(body, topic, tags, keys); @@ -876,9 +877,9 @@ void StartSendMessage(CProducer* producer) CSendResult result; // create message and set some values for it - CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); - SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); - SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); + CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str()); + SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str()); + SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str()); for (int i = 0; i < 10; i++) { @@ -902,9 +903,9 @@ void StartSendMessage(CProducer* producer,const char* strbody) CSendResult result; // create message and set some values for it - CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str()); - SetMessageTags(msg, G_ROCKETMQ_TAG.c_str()); - SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str()); + CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str()); + SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str()); + SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str()); SetMessageBody(msg, strbody); // send message @@ -963,7 +964,7 @@ void rocketmq_test_rt() { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT); + data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RT); std::ifstream file("rt.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -977,7 +978,7 @@ void rocketmq_test_ud()//用来测试台账更新 { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD); + data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_UD); std::ifstream file("ud.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -991,7 +992,7 @@ void rocketmq_test_set()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); + data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET); std::ifstream file("set.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -1005,8 +1006,8 @@ void rocketmq_test_only()//用来测试进程控制脚本 { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET); - std::ifstream file("set_debug.txt"); // 文件中存储长字符串 + data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_TEST); + std::ifstream file("test.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -1020,7 +1021,7 @@ void rocketmq_test_rc() { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC); + data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RC); std::ifstream file("rc.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 @@ -1035,7 +1036,7 @@ void rocketmq_test_log() { Ckafka_data_t data; data.monitor_id = 123123; - data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG); + data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG); std::ifstream file("log_test.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 1cad11d..5a47de2 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -280,9 +280,9 @@ extern int g_front_seg_num; //生产者 std::string G_ROCKETMQ_PRODUCER = "";//rocketmq producer std::string G_ROCKETMQ_IPPORT = "";//rocketmq ip+port -std::string G_ROCKETMQ_TOPIC = "";//topie -std::string G_ROCKETMQ_TAG = "";//tag -std::string G_ROCKETMQ_KEY = "";//key +std::string G_ROCKETMQ_TOPIC_TEST = "";//topie +std::string G_ROCKETMQ_TAG_TEST = "";//tag +std::string G_ROCKETMQ_KEY_TEST = "";//key int QUEUENUM = 0; std::string BROKERNAME = ""; //消费者 @@ -327,6 +327,8 @@ std::string G_MQCONSUMER_TOPIC_FILE = "";//consumer topie std::string G_MQCONSUMER_TAG_FILE = "";//consumer tag std::string G_MQCONSUMER_KEY_FILE = "";//consumer key +std::string G_MQCONSUMER_TOPIC_TEST = ""; + int G_TEST_FLAG = 0; int G_TEST_NUM = 0; int G_TEST_TYPE = 0; @@ -644,12 +646,12 @@ void init_config() { G_ROCKETMQ_PRODUCER = strdup(ba.data()); ba = settings.value("RocketMq/Ipport", "").toString().toLatin1(); G_ROCKETMQ_IPPORT = strdup(ba.data()); - ba = settings.value("RocketMq/Topic", "").toString().toLatin1(); - G_ROCKETMQ_TOPIC = strdup(ba.data()); - ba = settings.value("RocketMq/Tag", "").toString().toLatin1(); - G_ROCKETMQ_TAG = strdup(ba.data()); - ba = settings.value("RocketMq/Key", "").toString().toLatin1(); - G_ROCKETMQ_KEY = strdup(ba.data()); + ba = settings.value("RocketMq/TESTTopic", "").toString().toLatin1(); + G_ROCKETMQ_TOPIC_TEST = strdup(ba.data()); + ba = settings.value("RocketMq/TESTTag", "").toString().toLatin1(); + G_ROCKETMQ_TAG_TEST = strdup(ba.data()); + ba = settings.value("RocketMq/TESTKey", "").toString().toLatin1(); + G_ROCKETMQ_KEY_TEST = strdup(ba.data()); QUEUENUM = settings.value("RocketMq/Queuenum", 0).toInt(); //心跳 @@ -724,13 +726,16 @@ void init_config() { G_CONNECT_KEY = strdup(ba.data()); //lnk20260310添加文件管理的topic和tag - ba = settings.value("RocketMq/ConsumerTopicFile", "").toString().toLatin1(); + ba = settings.value("RocketMq/ConsumerTopicFILE", "").toString().toLatin1(); G_MQCONSUMER_TOPIC_FILE = strdup(ba.data()); - ba = settings.value("RocketMq/ConsumerTagFile", "").toString().toLatin1(); + ba = settings.value("RocketMq/ConsumerTagFILE", "").toString().toLatin1(); G_MQCONSUMER_TAG_FILE = strdup(ba.data()); - ba = settings.value("RocketMq/ConsumerKeyFile", "").toString().toLatin1(); + ba = settings.value("RocketMq/ConsumerKeyFILE", "").toString().toLatin1(); G_MQCONSUMER_KEY_FILE = strdup(ba.data()); + ba = settings.value("RocketMq/ConsumerTopicTEST", "").toString().toLatin1(); + G_MQCONSUMER_TOPIC_TEST = strdup(ba.data()); + //MQ测试 G_TEST_FLAG = settings.value("RocketMq/Testflag", 0).toInt(); @@ -747,9 +752,9 @@ void init_config() { //生产者相关打印 std::cout << "Read G_ROCKETMQ_PRODUCER:" << G_ROCKETMQ_PRODUCER << std::endl; std::cout << "Read G_ROCKETMQ_IPPORT:" << G_ROCKETMQ_IPPORT << std::endl; - std::cout << "Read G_ROCKETMQ_TOPIC:" << G_ROCKETMQ_TOPIC << std::endl; - std::cout << "Read G_ROCKETMQ_TAG:" << G_ROCKETMQ_TAG << std::endl; - std::cout << "Read G_ROCKETMQ_KEY:" << G_ROCKETMQ_KEY << std::endl; + std::cout << "Read G_ROCKETMQ_TOPIC_TEST:" << G_ROCKETMQ_TOPIC_TEST << std::endl; + std::cout << "Read G_ROCKETMQ_TAG_TEST:" << G_ROCKETMQ_TAG_TEST << std::endl; + std::cout << "Read G_ROCKETMQ_KEY_TEST:" << G_ROCKETMQ_KEY_TEST << std::endl; std::cout << "Read QUEUENUM:" << QUEUENUM << std::endl; std::cout << "Read G_LOG_TOPIC:" << G_LOG_TOPIC << std::endl; std::cout << "Read G_LOG_TAG:" << G_LOG_TAG << std::endl; @@ -6528,7 +6533,7 @@ bool shouldSkipTerminal(const char* terminal_id) { void rocketmq_test_300(int mpnum,int front_index,int type) { Ckafka_data_t data; - data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC); + data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC_TEST); data.mp_id = "0"; // 读取文件内容 diff --git a/json/save2json.cpp b/json/save2json.cpp index b1375e8..4cb585f 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -1865,18 +1865,20 @@ static int ParseFileDirReq(const char *body, file_dir_req_t *req) return -1; cJSON *guid = cJSON_GetObjectItem(root, "guid"); - cJSON *frontid = cJSON_GetObjectItem(root, "frontid"); - cJSON *processNo = cJSON_GetObjectItem(root, "ProcessNo"); - cJSON *devid = cJSON_GetObjectItem(root, "devid"); + cJSON *frontid = cJSON_GetObjectItem(root, "nodeId"); + cJSON *processNo = cJSON_GetObjectItem(root, "processNo"); + cJSON *devid = cJSON_GetObjectItem(root, "devId"); cJSON *type = cJSON_GetObjectItem(root, "type"); - cJSON *path = cJSON_GetObjectItem(root, "Path"); + cJSON *path = cJSON_GetObjectItem(root, "path"); + cJSON *remotepath = cJSON_GetObjectItem(root, "remotePath"); if (!guid || guid->type != cJSON_String || !frontid || frontid->type != cJSON_String || !processNo || processNo->type != cJSON_Number || !devid || devid->type != cJSON_String || !type || type->type != cJSON_Number || - !path || path->type != cJSON_String) + !path || path->type != cJSON_String || + !remotepath || remotepath->type != cJSON_String) { cJSON_Delete(root); return -1; @@ -1889,6 +1891,7 @@ static int ParseFileDirReq(const char *body, file_dir_req_t *req) snprintf(req->devid, sizeof(req->devid), "%s", devid->valuestring); req->type = type->valueint; snprintf(req->path, sizeof(req->path), "%s", path->valuestring); + snprintf(req->remote_path, sizeof(req->remote_path), "%s", remotepath->valuestring); req->create_time = time(NULL); cJSON_Delete(root); @@ -1943,25 +1946,28 @@ static std::string BuildFileDirRespJsonEx(const file_dir_req_t *req, int result) { cJSON *root = cJSON_CreateObject(); + cJSON *detail = cJSON_CreateObject(); cJSON *dirInfo = cJSON_CreateArray(); cJSON_AddStringToObject(root, "guid", req ? req->guid : ""); - cJSON_AddStringToObject(root, "frontid", req ? req->frontid : ""); + cJSON_AddStringToObject(root, "nodeId", req ? req->frontid : ""); cJSON_AddNumberToObject(root, "processNo", req ? req->processNo : 0); - cJSON_AddStringToObject(root, "devid", req ? req->devid : ""); + cJSON_AddStringToObject(root, "devId", req ? req->devid : ""); cJSON_AddNumberToObject(root, "type", req ? req->type : 0); + cJSON_AddNumberToObject(root, "result", result); for (int i = 0; i < itemNum; ++i) { cJSON *item = cJSON_CreateObject(); cJSON_AddStringToObject(item, "name", (names && names[i]) ? names[i] : ""); cJSON_AddStringToObject(item, "type", (itemTypes && itemTypes[i]) ? itemTypes[i] : "file"); - cJSON_AddNumberToObject(item, "size", (itemSizes ? itemSizes[i] : 1)); + cJSON_AddNumberToObject(item, "size", itemSizes ? itemSizes[i] : 1); cJSON_AddItemToArray(dirInfo, item); } - cJSON_AddItemToObject(root, "dirInfo", dirInfo); - cJSON_AddNumberToObject(root, "result", result); + cJSON_AddItemToObject(detail, "dirInfo", dirInfo); + + cJSON_AddItemToObject(root, "detail", detail); char *json = cJSON_PrintUnformatted(root); std::string jsonStr = json ? json : ""; @@ -2196,22 +2202,22 @@ static int HandleTypeTransferToDevice(chnl_usr_t* chnl_usr, return -1; char localpath[512] = {0}; - if (BuildTempLocalPath(localpath, sizeof(localpath), chnl_usr, req->path) != 0) + if (BuildTempLocalPath(localpath, sizeof(localpath), chnl_usr, req->remote_path) != 0) { DIY_ERRORLOG_CODE(req->devid,1, LOG_CODE_FILE_CONTROL, "【ERROR】构造本地临时路径失败 devid=%s path=%s", - req->devid, req->path); + req->devid, req->remote_path); jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1); return -1; } - int dlRet = DownloadFileWeb(WEB_FILEDOWNLOAD, req->path, localpath); + int dlRet = DownloadFileWeb(WEB_FILEDOWNLOAD, req->remote_path, localpath); if (dlRet != 0) { DIY_ERRORLOG_CODE(req->devid,1, LOG_CODE_FILE_CONTROL, "【ERROR】Web 文件下载失败 devid=%s, path=%s", - req->devid, req->path); + req->devid, req->remote_path); jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1); return -1; @@ -2219,23 +2225,18 @@ static int HandleTypeTransferToDevice(chnl_usr_t* chnl_usr, DIY_INFOLOG_CODE(req->devid,1, LOG_CODE_FILE_CONTROL, "【NORMAL】Web 文件下载成功 devid=%s, webpath=%s, local=%s", - req->devid, req->path, localpath); - - char destfilename[512] = {0}; - const char* fileName = GetFileNameOnly(req->path); - snprintf(destfilename, sizeof(destfilename), "/etc/%s", - (fileName && fileName[0]) ? fileName : "tmp_file.dat"); + req->devid, req->remote_path, localpath); ST_RET ret = mms_mvla_obtfile(chnl_usr->net_info, - (ST_CHAR*)localpath, - (ST_CHAR*)destfilename, + (ST_CHAR*)localpath, //本地路径 + (ST_CHAR*)req->path, //装置路径 3 * g_pt61850app->mmsOpTimeout); if (ret != SD_SUCCESS) { DIY_ERRORLOG_CODE(req->devid,1, LOG_CODE_FILE_CONTROL, "【ERROR】文件传送到装置失败 devid=%s, src=%s, dest=%s, ret=0x%X", - req->devid, localpath, destfilename, ret); + req->devid, localpath, req->path, ret); jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, ret); return -1; @@ -2243,10 +2244,10 @@ static int HandleTypeTransferToDevice(chnl_usr_t* chnl_usr, DIY_INFOLOG_CODE(req->devid,1, LOG_CODE_FILE_CONTROL, "【NORMAL】文件传送到装置成功 devid=%s, src=%s, dest=%s", - req->devid, localpath, destfilename); + req->devid, localpath, req->path); jsonString = BuildSingleFileRespJson(req, - destfilename, + req->path, "file", 1, 0); diff --git a/mms/db_interface.h b/mms/db_interface.h index 3e1d6f7..f983361 100644 --- a/mms/db_interface.h +++ b/mms/db_interface.h @@ -183,6 +183,8 @@ typedef struct file_dir_req_t char devid[128]; int type; char path[256]; + char remote_path[256]; + time_t create_time; } file_dir_req_t; diff --git a/mykafka.ini b/mykafka.ini index 3fbda81..fe81d72 100644 --- a/mykafka.ini +++ b/mykafka.ini @@ -74,7 +74,7 @@ FileFlag=4 FrontInst=884d132ac3a01225fcacc8c10da07d09 FrontIP=192.168.1.167 SendFlag=3 -RecallOnlyFlag= +RecallOnlyFlag=0 [Ledger] TerminalStatus="[0]" @@ -117,37 +117,43 @@ WriteUrl= [RocketMq] producer=Group_producer Ipport=192.168.1.68:9876 -Topic=TEST_Topic -Tag=Test_Tag -Key=Test_Keys +TESTTopic=TEST_Topic +TESTTag=884d132ac3a01225fcacc8c10da07d09 +TESTKey=Test_Keys Queuenum=4 Testflag=1 -Testnum=100 -Testtype=1 +Testnum=0 +Testtype=0 TestPort=11000 TestList= consumer=Group_consumer ConsumerIpport=192.168.1.68:9876 ConsumerTopicRT=ask_real_data_topic -ConsumerTagRT=Test_Tag +ConsumerTagRT=884d132ac3a01225fcacc8c10da07d09 ConsumerKeyRT=Test_Keys ConsumerAccessKey=rmqroot ConsumerSecretKey=001@#njcnmq ConsumerChannel= ConsumerTopicUD=control_Topic -ConsumerTagUD=Test_Tag +ConsumerTagUD=884d132ac3a01225fcacc8c10da07d09 ConsumerKeyUD=Test_Keys ConsumerTopicRC=recall_Topic -ConsumerTagRC=Test_Tag +ConsumerTagRC=884d132ac3a01225fcacc8c10da07d09 ConsumerKeyRC=Test_Keys ConsumerTopicSET=process_Topic -ConsumerTagSET=Test_Tag +ConsumerTagSET=884d132ac3a01225fcacc8c10da07d09 ConsumerKeySET=Test_Keys ConsumerTopicLOG=ask_log_Topic -ConsumerTagLOG=Test_Tag +ConsumerTagLOG=884d132ac3a01225fcacc8c10da07d09 ConsumerKeyLOG=Test_Keys +ConsumerTopicFILE=File_Topic +ConsumerTagFILE=884d132ac3a01225fcacc8c10da07d09 +ConsumerKeyFILE=Test_Keys + +ConsumerTopicTEST=File_Topic + LOGTopic=log_Topic LOGTag=Test_Tag LOGKey=Test_Keys