From 2f2e0d64302cbf64b253acc246ea3118daa312eb Mon Sep 17 00:00:00 2001 From: lnk Date: Thu, 7 May 2026 16:48:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=87=E4=BB=B6=E7=9B=AE=E5=BD=95=E5=92=8C?= =?UTF-8?q?=E4=B8=8B=E8=BD=BD=E8=AF=B7=E6=B1=82=E5=8A=9F=E8=83=BD=E8=87=AA?= =?UTF-8?q?=E6=B5=8B=E9=80=9A=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cfg_parse/cfg_parser.cpp | 31 ++++- json/save2json.cpp | 269 ++++++++++++++++++++++++++++++++------- mms/mms_process.c | 1 + 3 files changed, 251 insertions(+), 50 deletions(-) diff --git a/cfg_parse/cfg_parser.cpp b/cfg_parse/cfg_parser.cpp index 5a47de2..1d22c78 100644 --- a/cfg_parse/cfg_parser.cpp +++ b/cfg_parse/cfg_parser.cpp @@ -327,6 +327,10 @@ std::string G_MQCONSUMER_TOPIC_FILE = "";//consumer topie std::string G_MQCONSUMER_TAG_FILE = "";//consumer tag std::string G_MQCONSUMER_KEY_FILE = "";//consumer key +std::string G_REPLY_TOPIC_FILE = "";//consumer topie +std::string G_REPLY_TAG_FILE = "";//consumer tag +std::string G_REPLY_KEY_FILE = "";//consumer key + std::string G_MQCONSUMER_TOPIC_TEST = ""; int G_TEST_FLAG = 0; @@ -732,6 +736,12 @@ void init_config() { G_MQCONSUMER_TAG_FILE = strdup(ba.data()); ba = settings.value("RocketMq/ConsumerKeyFILE", "").toString().toLatin1(); G_MQCONSUMER_KEY_FILE = strdup(ba.data()); + ba = settings.value("RocketMq/ReplyTopicFILE", "").toString().toLatin1(); + G_REPLY_TOPIC_FILE = strdup(ba.data()); + ba = settings.value("RocketMq/ReplyTagFILE", "").toString().toLatin1(); + G_REPLY_TAG_FILE = strdup(ba.data()); + ba = settings.value("RocketMq/ReplyKeyFILE", "").toString().toLatin1(); + G_REPLY_KEY_FILE = strdup(ba.data()); ba = settings.value("RocketMq/ConsumerTopicTEST", "").toString().toLatin1(); G_MQCONSUMER_TOPIC_TEST = strdup(ba.data()); @@ -5306,7 +5316,7 @@ std::string base64_encode(const std::string& in) { return out; // 返回编码后的字符串 } -void handleUploadResponse(const std::string& response, char* wavepath) { +void handleUploadResponse(const std::string& response, char* wavepath, int type) { // 解析 JSON 响应 cJSON* json_data = cJSON_Parse(response.c_str()); @@ -5354,7 +5364,12 @@ void handleUploadResponse(const std::string& response, char* wavepath) { } // 拷贝到 wavepath - strcpy(wavepath, nameWithoutExt.c_str()); + if (type == 1) { + strcpy(wavepath, nameWithoutExt.c_str()); + } + else { + strcpy(wavepath, name.c_str()); + } std::cout << "wavepath: " << wavepath << std::endl; @@ -5428,7 +5443,7 @@ void handleUploadResponse(const std::string& response, char* wavepath) { }*/ //这是dataform发送方式 -void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath) { +void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath,int type) { // 初始化 curl CURL* curl = curl_easy_init(); if (curl) { @@ -5482,7 +5497,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【ERROR】前置上传暂态录波文件 %s 失败,请检查文件上传接口配置",localpath); } else { std::cout << "http web success, response: " << resPost0 << std::endl; - handleUploadResponse(resPost0, wavepath); // 处理响应 + handleUploadResponse(resPost0, wavepath, type); // 处理响应 } // 清理 @@ -5497,7 +5512,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c void SOEFileWeb(char* localpath,char* cloudpath, char* wavepath) { //示例ip,更换为实际ip即可 - SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath); + SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath,1); } void SOEFileWeb_test() @@ -5550,9 +5565,9 @@ int DownloadFileWeb(const std::string& strUrl, std::string fullUrl = strUrl; if (fullUrl.find('?') == std::string::npos) - fullUrl += "?path="; + fullUrl += "?filePath="; else - fullUrl += "&path="; + fullUrl += "&filePath="; fullUrl += encodedPath; curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str()); @@ -5574,12 +5589,14 @@ int DownloadFileWeb(const std::string& strUrl, if (res != CURLE_OK) { std::cerr << "DownloadFileWeb failed: " << curl_easy_strerror(res) << std::endl; + remove(localpath); return -1; } if (http_code != 200) { std::cerr << "DownloadFileWeb http code: " << http_code << std::endl; + remove(localpath); return -1; } diff --git a/json/save2json.cpp b/json/save2json.cpp index 4cb585f..16b0fd8 100644 --- a/json/save2json.cpp +++ b/json/save2json.cpp @@ -54,7 +54,8 @@ extern int RECALL_ONLY_FLAG; //lnk20260309添加一个全局变量,标志是 extern void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, - char* wavepath); + char* wavepath, + int type); extern int DownloadFileWeb(const std::string& strUrl, const char* remotePath, @@ -153,6 +154,9 @@ extern std::string G_LOG_KEY;//key extern std::string G_MQCONSUMER_TOPIC_FILE;//topie_file extern std::string G_MQCONSUMER_TAG_FILE;//tag extern std::string G_MQCONSUMER_KEY_FILE;//key +extern std::string G_REPLY_TOPIC_FILE;//topie_file +extern std::string G_REPLY_TAG_FILE;//tag +extern std::string G_REPLY_KEY_FILE;//key extern std::string Topic_Reply_Topic; extern std::string Topic_Reply_Tag; extern std::string Topic_Reply_Key; @@ -1858,11 +1862,17 @@ int parse_control(const std::string& json_str, const std::string& output_dir) { static int ParseFileDirReq(const char *body, file_dir_req_t *req) { if (body == NULL || req == NULL) + { + std::cout << "[ParseFileDirReq] body invalid" << std::endl; return -1; + } cJSON *root = cJSON_Parse(body); if (root == NULL) + { + std::cout << "[ParseFileDirReq] parse json failed" << std::endl; return -1; + } cJSON *guid = cJSON_GetObjectItem(root, "guid"); cJSON *frontid = cJSON_GetObjectItem(root, "nodeId"); @@ -1870,28 +1880,50 @@ static int ParseFileDirReq(const char *body, file_dir_req_t *req) cJSON *devid = cJSON_GetObjectItem(root, "devId"); cJSON *type = cJSON_GetObjectItem(root, "type"); cJSON *path = cJSON_GetObjectItem(root, "path"); - cJSON *remotepath = cJSON_GetObjectItem(root, "remotePath"); + cJSON *remotepath = cJSON_GetObjectItem(root, "remotePath"); if (!guid || guid->type != cJSON_String || !frontid || frontid->type != cJSON_String || !processNo || processNo->type != cJSON_Number || !devid || devid->type != cJSON_String || !type || type->type != cJSON_Number || - !path || path->type != cJSON_String || - !remotepath || remotepath->type != cJSON_String) + !path || path->type != cJSON_String) { + std::cout << "[ParseFileDirReq] invalid" + << ", guid=" << (guid ? guid->type : -1) + << ", nodeId=" << (frontid ? frontid->type : -1) + << ", processNo=" << (processNo ? processNo->type : -1) + << ", devId=" << (devid ? devid->type : -1) + << ", type=" << (type ? type->type : -1) + << ", path=" << (path ? path->type : -1) + << std::endl; + cJSON_Delete(root); return -1; } memset(req, 0, sizeof(file_dir_req_t)); + snprintf(req->guid, sizeof(req->guid), "%s", guid->valuestring); snprintf(req->frontid, sizeof(req->frontid), "%s", frontid->valuestring); req->processNo = processNo->valueint; snprintf(req->devid, sizeof(req->devid), "%s", devid->valuestring); req->type = type->valueint; snprintf(req->path, sizeof(req->path), "%s", path->valuestring); - snprintf(req->remote_path, sizeof(req->remote_path), "%s", remotepath->valuestring); + + // remotePath 可选 + if (remotepath && remotepath->type == cJSON_String && remotepath->valuestring) + { + snprintf(req->remote_path, + sizeof(req->remote_path), + "%s", + remotepath->valuestring); + } + else + { + req->remote_path[0] = '\0'; + } + req->create_time = time(NULL); cJSON_Delete(root); @@ -1938,30 +1970,30 @@ static file_dir_req_t* PopMatchedFileDirReq(const char *terminal_id) return match; } -static std::string BuildFileDirRespJsonEx(const file_dir_req_t *req, - char **names, - const char **itemTypes, - int *itemSizes, - int itemNum, - int result) +static std::string BuildFileDirRespJsonEx(const file_dir_req_t *req, //文件请求的基本信息 + char **names, //文件或目录的名称列表 + const char **itemTypes, //文件或目录的类型列表(如 "file" 或 "dir") + int *itemSizes, //文件的大小列表(如果是目录,可以设置为0或1) + int itemNum, //文件或目录的数量 + int result) //操作结果,0表示成功,非0表示失败 { cJSON *root = cJSON_CreateObject(); cJSON *detail = cJSON_CreateObject(); cJSON *dirInfo = cJSON_CreateArray(); - cJSON_AddStringToObject(root, "guid", req ? req->guid : ""); - cJSON_AddStringToObject(root, "nodeId", req ? req->frontid : ""); - cJSON_AddNumberToObject(root, "processNo", req ? req->processNo : 0); - cJSON_AddStringToObject(root, "devId", req ? req->devid : ""); - cJSON_AddNumberToObject(root, "type", req ? req->type : 0); - cJSON_AddNumberToObject(root, "result", result); + cJSON_AddStringToObject(root, "guid", req ? req->guid : ""); //请求的唯一标识符 + cJSON_AddStringToObject(root, "nodeId", req ? req->frontid : ""); //请求发送方的前置机ID + cJSON_AddNumberToObject(root, "processNo", req ? req->processNo : 0);//请求发送方的进程号 + cJSON_AddStringToObject(root, "devId", req ? req->devid : ""); //请求发送方的设备ID + cJSON_AddNumberToObject(root, "type", req ? req->type : 0); //请求类型,如0表示文件列表请求,1表示文件下载请求 + cJSON_AddNumberToObject(root, "result", result); //操作结果 for (int i = 0; i < itemNum; ++i) { cJSON *item = cJSON_CreateObject(); - cJSON_AddStringToObject(item, "name", (names && names[i]) ? names[i] : ""); - cJSON_AddStringToObject(item, "type", (itemTypes && itemTypes[i]) ? itemTypes[i] : "file"); - cJSON_AddNumberToObject(item, "size", itemSizes ? itemSizes[i] : 1); + cJSON_AddStringToObject(item, "name", (names && names[i]) ? names[i] : ""); //文件或目录的名称,下载文件这里响应远端文件路径,上传文件这里响应上传后的目录列表 + cJSON_AddStringToObject(item, "type", (itemTypes && itemTypes[i]) ? itemTypes[i] : "file");//文件或目录的类型,默认为 "file" + cJSON_AddNumberToObject(item, "size", itemSizes ? itemSizes[i] : 1);//文件的大小,如果是目录,可以设置为0或1 cJSON_AddItemToArray(dirInfo, item); } @@ -1990,14 +2022,39 @@ static std::string BuildFileDirRespJson(const file_dir_req_t *req, for (int i = 0; i < filenum; ++i) { - types[i] = "dir"; sizes[i] = 1; + + if (filenames[i] != NULL) + { + int len = strlen(filenames[i]); + + // 以 / 结尾 -> dir + if (len > 0 && filenames[i][len - 1] == '/') + { + types[i] = "dir"; + } + else + { + types[i] = "file"; + } + } + else + { + types[i] = "file"; + } } - std::string jsonStr = BuildFileDirRespJsonEx(req, filenames, types, sizes, filenum, result); + std::string jsonStr = + BuildFileDirRespJsonEx(req, + filenames, + types, + sizes, + filenum, + result); delete [] types; delete [] sizes; + return jsonStr; } @@ -2013,7 +2070,7 @@ static std::string BuildSingleFileRespJson(const file_dir_req_t *req, if (name == NULL || result != 0) { - return BuildFileDirRespJsonEx(req, NULL, NULL, NULL, 0, result); + return BuildFileDirRespJsonEx(req, NULL, NULL, NULL, 0, result);//异常响应-1 } names[0] = (char *)name; @@ -2110,6 +2167,43 @@ static void SafePathName(const char* src, char* dst, size_t dstSize) dst[j] = '\0'; } +static void trim_inplace(char *s) +{ + if (!s) return; + + char *p = s; + while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n') { + ++p; + } + + if (p != s) { + memmove(s, p, strlen(p) + 1); + } + + int len = strlen(s); + while (len > 0 && + (s[len - 1] == ' ' || + s[len - 1] == '\t' || + s[len - 1] == '\r' || + s[len - 1] == '\n')) { + s[len - 1] = '\0'; + --len; + } +} + +static std::string get_parent_dir(const char* fullpath) +{ + if (fullpath == NULL) return "/"; + + std::string s(fullpath); + size_t pos = s.find_last_of('/'); + + if (pos == std::string::npos) return "/"; + if (pos == 0) return "/"; + + return s.substr(0, pos + 1); +} + static int BuildTempLocalPath(char* outPath, size_t outSize, chnl_usr_t* chnl_usr, @@ -2153,7 +2247,7 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr, char localpath[512] = {0}; if (BuildTempLocalPath(localpath, sizeof(localpath), chnl_usr, req->path) != 0) { - DIY_ERRORLOG_CODE("process",0, LOG_CODE_TRANSIENT_COMM, + DIY_ERRORLOG_CODE("process",0, LOG_CODE_FILE_CONTROL, "【ERROR】构造本地临时路径失败 devid=%s path=%s", req->devid, req->path); @@ -2161,6 +2255,12 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr, return -1; } + trim_inplace(req->path); + trim_inplace(localpath); + + std::cout << "[FILE][GET] localpath=[" << localpath + << "], remote=[" << req->path << "]" << std::endl; + ST_RET ret = mms_getFile(chnl_usr->net_info, (ST_CHAR*)localpath, (ST_CHAR*)req->path, @@ -2168,11 +2268,11 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr, if (ret != SD_SUCCESS) { - DIY_ERRORLOG_CODE("process",0, LOG_CODE_TRANSIENT_COMM, + DIY_ERRORLOG_CODE("process",0, LOG_CODE_FILE_CONTROL, "【ERROR】装置文件下载失败 devid=%s, rem=%s, ret=0x%X", req->devid, req->path, ret); - jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, ret); + jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1); return -1; } @@ -2181,13 +2281,48 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr, req->devid, req->path, localpath); char wavepath[512] = {0}; - SendFileWeb(WEB_FILEUPLOAD, localpath, req->path, wavepath); + // 拼接上传路径 + char upload_path[1024] = {0}; + + snprintf(upload_path, + sizeof(upload_path), + "/upload/%s%s", + req->devid, + req->path); + + std::string cloud_dir = get_parent_dir(upload_path); + + std::cout << "[FILE][UPLOAD]" + << " local=[" << localpath << "]" + << ", remoteFile=[" << upload_path << "]" + << ", cloudDir=[" << cloud_dir << "]" + << std::endl; + + SendFileWeb(WEB_FILEUPLOAD, localpath, cloud_dir.c_str(), wavepath,0); + + // 上传失败 + if (wavepath[0] == 0) + { + std::cout << "[FILEUPLOAD] upload failed, wavepath empty" << std::endl; + + jsonString = BuildSingleFileRespJson(req, + "", + "file", + 1, + -1); // 失败 + } + else + { + std::cout << "[FILEUPLOAD] upload success, wavepath=" + << wavepath << std::endl; + + jsonString = BuildSingleFileRespJson(req, + wavepath, + "file", + 1, + 0); // 成功 + } - jsonString = BuildSingleFileRespJson(req, - (wavepath[0] != 0 ? wavepath : req->path), - "file", - 1, - 0); remove(localpath); return 0; @@ -2238,7 +2373,7 @@ static int HandleTypeTransferToDevice(chnl_usr_t* chnl_usr, "【ERROR】文件传送到装置失败 devid=%s, src=%s, dest=%s, ret=0x%X", req->devid, localpath, req->path, ret); - jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, ret); + jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1); return -1; } @@ -2258,20 +2393,47 @@ static int HandleTypeTransferToDevice(chnl_usr_t* chnl_usr, void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr) { - if (chnl_usr == NULL || chnl_usr->chnl == NULL || chnl_usr->chnl->ied == NULL) + if (chnl_usr == NULL) { + std::cout << "[FILEDIR] chnl_usr is NULL" << std::endl; return; + } + + if (chnl_usr->chnl == NULL) { + std::cout << "[FILEDIR] chnl is NULL" << std::endl; + return; + } + + if (chnl_usr->chnl->ied == NULL) { + std::cout << "[FILEDIR] ied is NULL" << std::endl; + return; + } ied_t *ied = chnl_usr->chnl->ied; ied_usr_t *ied_usr = GET_IEDEXT_ADDR(ied); - if (ied_usr == NULL) + if (ied_usr == NULL) { + std::cout << "[FILEDIR] ied_usr is NULL" << std::endl; return; + } - if (ied_usr->terminal_id[0] == 0) + if (ied_usr->terminal_id[0] == 0) { + std::cout << "[FILEDIR] terminal_id empty" << std::endl; return; + } + + std::cout << "[FILEDIR] checking req for terminal_id=" + << ied_usr->terminal_id << std::endl; file_dir_req_t *req = PopMatchedFileDirReq(ied_usr->terminal_id); - if (req == NULL) - return; // 当前连接没有文件请求 + if (req == NULL) { + std::cout << "[FILEDIR] no matched request, terminal_id=" + << ied_usr->terminal_id << std::endl; + return; + } + + std::cout << "[FILEDIR] matched request success, terminal_id=" + << ied_usr->terminal_id + << ", guid=" << req->guid + << std::endl; DIY_INFOLOG_CODE(req->devid,1, LOG_CODE_FILE_CONTROL, "【NORMAL】处理文件请求 terminal_id=%s type=%d path=%s", @@ -2320,14 +2482,14 @@ void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr) "【WARN】未知文件请求类型 type=%d devid=%s path=%s", req->type, req->devid, req->path); - jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, 1);//1是失败 + jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1);//-1是失败 handleRet = -1; } /* 统一回 Kafka */ Ckafka_data_t dir_info; - dir_info.strTopic = QString::fromStdString(Topic_Reply_Topic); - dir_info.mp_id = QString::fromLocal8Bit(req->guid); + dir_info.strTopic = QString::fromStdString(G_REPLY_TOPIC_FILE); + dir_info.mp_id = QString::fromLocal8Bit(req->devid); dir_info.strText = QString::fromStdString(jsonString); kafka_data_list_mutex.lock(); @@ -2790,14 +2952,35 @@ ConsumeStatus myMessageCallbackfile( return rocketmq::RECONSUME_LATER; } + cJSON* outer = cJSON_Parse(body.c_str()); + if (!outer) { + std::cerr << "parse outer json failed" << std::endl; + return rocketmq::RECONSUME_LATER; + } + + cJSON* messageBody = cJSON_GetObjectItem(outer, "messageBody"); + + if (!messageBody || + messageBody->type != cJSON_String || + messageBody->valuestring == NULL) + { + std::cerr << "messageBody invalid" << std::endl; + cJSON_Delete(outer); + return rocketmq::RECONSUME_LATER; + } + + std::string realBody = messageBody->valuestring; + + cJSON_Delete(outer); + DIY_INFOLOG_CODE("process",0,LOG_CODE_FILE_CONTROL,"【NORMAL】前置消费topic:%s_%s的文件控制消息", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_FILE.c_str()); - std::cout << "file Callback received message: " << body << std::endl; + std::cout << "file Callback received message: " << realBody << std::endl; std::cout << "Message Key: " << (key.empty() ? "N/A" : key) << std::endl; file_dir_req_t req; - if (ParseFileDirReq(body.c_str(), &req) != 0) + if (ParseFileDirReq(realBody.c_str(), &req) != 0) { DIY_WARNLOG_CODE(req.devid,1,LOG_CODE_FILE_CONTROL,"【WARN】文件控制消息解析失败: %s", body); //return E_RECONSUME_LATER; diff --git a/mms/mms_process.c b/mms/mms_process.c index 9ec679c..04576f9 100644 --- a/mms/mms_process.c +++ b/mms/mms_process.c @@ -1531,6 +1531,7 @@ void CheckAllConnectedChannel() if(chnl_usr->m_state == CHANNEL_CONNECTED) { if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID) { + printf("[FILEDIR] enter HandleFileDirReqForChannel"); HandleFileDirReqForChannel(chnl_usr);//文件目录请求 }