文件目录和下载请求功能自测通过
This commit is contained in:
@@ -327,6 +327,10 @@ std::string G_MQCONSUMER_TOPIC_FILE = "";//consumer topie
|
||||
std::string G_MQCONSUMER_TAG_FILE = "";//consumer tag
|
||||
std::string G_MQCONSUMER_KEY_FILE = "";//consumer key
|
||||
|
||||
std::string G_REPLY_TOPIC_FILE = "";//consumer topie
|
||||
std::string G_REPLY_TAG_FILE = "";//consumer tag
|
||||
std::string G_REPLY_KEY_FILE = "";//consumer key
|
||||
|
||||
std::string G_MQCONSUMER_TOPIC_TEST = "";
|
||||
|
||||
int G_TEST_FLAG = 0;
|
||||
@@ -732,6 +736,12 @@ void init_config() {
|
||||
G_MQCONSUMER_TAG_FILE = strdup(ba.data());
|
||||
ba = settings.value("RocketMq/ConsumerKeyFILE", "").toString().toLatin1();
|
||||
G_MQCONSUMER_KEY_FILE = strdup(ba.data());
|
||||
ba = settings.value("RocketMq/ReplyTopicFILE", "").toString().toLatin1();
|
||||
G_REPLY_TOPIC_FILE = strdup(ba.data());
|
||||
ba = settings.value("RocketMq/ReplyTagFILE", "").toString().toLatin1();
|
||||
G_REPLY_TAG_FILE = strdup(ba.data());
|
||||
ba = settings.value("RocketMq/ReplyKeyFILE", "").toString().toLatin1();
|
||||
G_REPLY_KEY_FILE = strdup(ba.data());
|
||||
|
||||
ba = settings.value("RocketMq/ConsumerTopicTEST", "").toString().toLatin1();
|
||||
G_MQCONSUMER_TOPIC_TEST = strdup(ba.data());
|
||||
@@ -5306,7 +5316,7 @@ std::string base64_encode(const std::string& in) {
|
||||
return out; // 返回编码后的字符串
|
||||
}
|
||||
|
||||
void handleUploadResponse(const std::string& response, char* wavepath) {
|
||||
void handleUploadResponse(const std::string& response, char* wavepath, int type) {
|
||||
|
||||
// 解析 JSON 响应
|
||||
cJSON* json_data = cJSON_Parse(response.c_str());
|
||||
@@ -5354,7 +5364,12 @@ void handleUploadResponse(const std::string& response, char* wavepath) {
|
||||
}
|
||||
|
||||
// 拷贝到 wavepath
|
||||
strcpy(wavepath, nameWithoutExt.c_str());
|
||||
if (type == 1) {
|
||||
strcpy(wavepath, nameWithoutExt.c_str());
|
||||
}
|
||||
else {
|
||||
strcpy(wavepath, name.c_str());
|
||||
}
|
||||
|
||||
std::cout << "wavepath: " << wavepath << std::endl;
|
||||
|
||||
@@ -5428,7 +5443,7 @@ void handleUploadResponse(const std::string& response, char* wavepath) {
|
||||
}*/
|
||||
|
||||
//这是dataform发送方式
|
||||
void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath) {
|
||||
void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath,int type) {
|
||||
// 初始化 curl
|
||||
CURL* curl = curl_easy_init();
|
||||
if (curl) {
|
||||
@@ -5482,7 +5497,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【ERROR】前置上传暂态录波文件 %s 失败,请检查文件上传接口配置",localpath);
|
||||
} else {
|
||||
std::cout << "http web success, response: " << resPost0 << std::endl;
|
||||
handleUploadResponse(resPost0, wavepath); // 处理响应
|
||||
handleUploadResponse(resPost0, wavepath, type); // 处理响应
|
||||
}
|
||||
|
||||
// 清理
|
||||
@@ -5497,7 +5512,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c
|
||||
void SOEFileWeb(char* localpath,char* cloudpath, char* wavepath)
|
||||
{
|
||||
//示例ip,更换为实际ip即可
|
||||
SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath);
|
||||
SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath,1);
|
||||
}
|
||||
|
||||
void SOEFileWeb_test()
|
||||
@@ -5550,9 +5565,9 @@ int DownloadFileWeb(const std::string& strUrl,
|
||||
|
||||
std::string fullUrl = strUrl;
|
||||
if (fullUrl.find('?') == std::string::npos)
|
||||
fullUrl += "?path=";
|
||||
fullUrl += "?filePath=";
|
||||
else
|
||||
fullUrl += "&path=";
|
||||
fullUrl += "&filePath=";
|
||||
fullUrl += encodedPath;
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
|
||||
@@ -5574,12 +5589,14 @@ int DownloadFileWeb(const std::string& strUrl,
|
||||
if (res != CURLE_OK)
|
||||
{
|
||||
std::cerr << "DownloadFileWeb failed: " << curl_easy_strerror(res) << std::endl;
|
||||
remove(localpath);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (http_code != 200)
|
||||
{
|
||||
std::cerr << "DownloadFileWeb http code: " << http_code << std::endl;
|
||||
remove(localpath);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,8 @@ extern int RECALL_ONLY_FLAG; //lnk20260309添加一个全局变量,标志是
|
||||
extern void SendFileWeb(const std::string& strUrl,
|
||||
const char* localpath,
|
||||
const char* cloudpath,
|
||||
char* wavepath);
|
||||
char* wavepath,
|
||||
int type);
|
||||
|
||||
extern int DownloadFileWeb(const std::string& strUrl,
|
||||
const char* remotePath,
|
||||
@@ -153,6 +154,9 @@ extern std::string G_LOG_KEY;//key
|
||||
extern std::string G_MQCONSUMER_TOPIC_FILE;//topie_file
|
||||
extern std::string G_MQCONSUMER_TAG_FILE;//tag
|
||||
extern std::string G_MQCONSUMER_KEY_FILE;//key
|
||||
extern std::string G_REPLY_TOPIC_FILE;//topie_file
|
||||
extern std::string G_REPLY_TAG_FILE;//tag
|
||||
extern std::string G_REPLY_KEY_FILE;//key
|
||||
extern std::string Topic_Reply_Topic;
|
||||
extern std::string Topic_Reply_Tag;
|
||||
extern std::string Topic_Reply_Key;
|
||||
@@ -1858,11 +1862,17 @@ int parse_control(const std::string& json_str, const std::string& output_dir) {
|
||||
static int ParseFileDirReq(const char *body, file_dir_req_t *req)
|
||||
{
|
||||
if (body == NULL || req == NULL)
|
||||
{
|
||||
std::cout << "[ParseFileDirReq] body invalid" << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
cJSON *root = cJSON_Parse(body);
|
||||
if (root == NULL)
|
||||
{
|
||||
std::cout << "[ParseFileDirReq] parse json failed" << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
cJSON *guid = cJSON_GetObjectItem(root, "guid");
|
||||
cJSON *frontid = cJSON_GetObjectItem(root, "nodeId");
|
||||
@@ -1870,28 +1880,50 @@ static int ParseFileDirReq(const char *body, file_dir_req_t *req)
|
||||
cJSON *devid = cJSON_GetObjectItem(root, "devId");
|
||||
cJSON *type = cJSON_GetObjectItem(root, "type");
|
||||
cJSON *path = cJSON_GetObjectItem(root, "path");
|
||||
cJSON *remotepath = cJSON_GetObjectItem(root, "remotePath");
|
||||
cJSON *remotepath = cJSON_GetObjectItem(root, "remotePath");
|
||||
|
||||
if (!guid || guid->type != cJSON_String ||
|
||||
!frontid || frontid->type != cJSON_String ||
|
||||
!processNo || processNo->type != cJSON_Number ||
|
||||
!devid || devid->type != cJSON_String ||
|
||||
!type || type->type != cJSON_Number ||
|
||||
!path || path->type != cJSON_String ||
|
||||
!remotepath || remotepath->type != cJSON_String)
|
||||
!path || path->type != cJSON_String)
|
||||
{
|
||||
std::cout << "[ParseFileDirReq] invalid"
|
||||
<< ", guid=" << (guid ? guid->type : -1)
|
||||
<< ", nodeId=" << (frontid ? frontid->type : -1)
|
||||
<< ", processNo=" << (processNo ? processNo->type : -1)
|
||||
<< ", devId=" << (devid ? devid->type : -1)
|
||||
<< ", type=" << (type ? type->type : -1)
|
||||
<< ", path=" << (path ? path->type : -1)
|
||||
<< std::endl;
|
||||
|
||||
cJSON_Delete(root);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(req, 0, sizeof(file_dir_req_t));
|
||||
|
||||
snprintf(req->guid, sizeof(req->guid), "%s", guid->valuestring);
|
||||
snprintf(req->frontid, sizeof(req->frontid), "%s", frontid->valuestring);
|
||||
req->processNo = processNo->valueint;
|
||||
snprintf(req->devid, sizeof(req->devid), "%s", devid->valuestring);
|
||||
req->type = type->valueint;
|
||||
snprintf(req->path, sizeof(req->path), "%s", path->valuestring);
|
||||
snprintf(req->remote_path, sizeof(req->remote_path), "%s", remotepath->valuestring);
|
||||
|
||||
// remotePath 可选
|
||||
if (remotepath && remotepath->type == cJSON_String && remotepath->valuestring)
|
||||
{
|
||||
snprintf(req->remote_path,
|
||||
sizeof(req->remote_path),
|
||||
"%s",
|
||||
remotepath->valuestring);
|
||||
}
|
||||
else
|
||||
{
|
||||
req->remote_path[0] = '\0';
|
||||
}
|
||||
|
||||
req->create_time = time(NULL);
|
||||
|
||||
cJSON_Delete(root);
|
||||
@@ -1938,30 +1970,30 @@ static file_dir_req_t* PopMatchedFileDirReq(const char *terminal_id)
|
||||
return match;
|
||||
}
|
||||
|
||||
static std::string BuildFileDirRespJsonEx(const file_dir_req_t *req,
|
||||
char **names,
|
||||
const char **itemTypes,
|
||||
int *itemSizes,
|
||||
int itemNum,
|
||||
int result)
|
||||
static std::string BuildFileDirRespJsonEx(const file_dir_req_t *req, //文件请求的基本信息
|
||||
char **names, //文件或目录的名称列表
|
||||
const char **itemTypes, //文件或目录的类型列表(如 "file" 或 "dir")
|
||||
int *itemSizes, //文件的大小列表(如果是目录,可以设置为0或1)
|
||||
int itemNum, //文件或目录的数量
|
||||
int result) //操作结果,0表示成功,非0表示失败
|
||||
{
|
||||
cJSON *root = cJSON_CreateObject();
|
||||
cJSON *detail = cJSON_CreateObject();
|
||||
cJSON *dirInfo = cJSON_CreateArray();
|
||||
|
||||
cJSON_AddStringToObject(root, "guid", req ? req->guid : "");
|
||||
cJSON_AddStringToObject(root, "nodeId", req ? req->frontid : "");
|
||||
cJSON_AddNumberToObject(root, "processNo", req ? req->processNo : 0);
|
||||
cJSON_AddStringToObject(root, "devId", req ? req->devid : "");
|
||||
cJSON_AddNumberToObject(root, "type", req ? req->type : 0);
|
||||
cJSON_AddNumberToObject(root, "result", result);
|
||||
cJSON_AddStringToObject(root, "guid", req ? req->guid : ""); //请求的唯一标识符
|
||||
cJSON_AddStringToObject(root, "nodeId", req ? req->frontid : ""); //请求发送方的前置机ID
|
||||
cJSON_AddNumberToObject(root, "processNo", req ? req->processNo : 0);//请求发送方的进程号
|
||||
cJSON_AddStringToObject(root, "devId", req ? req->devid : ""); //请求发送方的设备ID
|
||||
cJSON_AddNumberToObject(root, "type", req ? req->type : 0); //请求类型,如0表示文件列表请求,1表示文件下载请求
|
||||
cJSON_AddNumberToObject(root, "result", result); //操作结果
|
||||
|
||||
for (int i = 0; i < itemNum; ++i)
|
||||
{
|
||||
cJSON *item = cJSON_CreateObject();
|
||||
cJSON_AddStringToObject(item, "name", (names && names[i]) ? names[i] : "");
|
||||
cJSON_AddStringToObject(item, "type", (itemTypes && itemTypes[i]) ? itemTypes[i] : "file");
|
||||
cJSON_AddNumberToObject(item, "size", itemSizes ? itemSizes[i] : 1);
|
||||
cJSON_AddStringToObject(item, "name", (names && names[i]) ? names[i] : ""); //文件或目录的名称,下载文件这里响应远端文件路径,上传文件这里响应上传后的目录列表
|
||||
cJSON_AddStringToObject(item, "type", (itemTypes && itemTypes[i]) ? itemTypes[i] : "file");//文件或目录的类型,默认为 "file"
|
||||
cJSON_AddNumberToObject(item, "size", itemSizes ? itemSizes[i] : 1);//文件的大小,如果是目录,可以设置为0或1
|
||||
cJSON_AddItemToArray(dirInfo, item);
|
||||
}
|
||||
|
||||
@@ -1990,14 +2022,39 @@ static std::string BuildFileDirRespJson(const file_dir_req_t *req,
|
||||
|
||||
for (int i = 0; i < filenum; ++i)
|
||||
{
|
||||
types[i] = "dir";
|
||||
sizes[i] = 1;
|
||||
|
||||
if (filenames[i] != NULL)
|
||||
{
|
||||
int len = strlen(filenames[i]);
|
||||
|
||||
// 以 / 结尾 -> dir
|
||||
if (len > 0 && filenames[i][len - 1] == '/')
|
||||
{
|
||||
types[i] = "dir";
|
||||
}
|
||||
else
|
||||
{
|
||||
types[i] = "file";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
types[i] = "file";
|
||||
}
|
||||
}
|
||||
|
||||
std::string jsonStr = BuildFileDirRespJsonEx(req, filenames, types, sizes, filenum, result);
|
||||
std::string jsonStr =
|
||||
BuildFileDirRespJsonEx(req,
|
||||
filenames,
|
||||
types,
|
||||
sizes,
|
||||
filenum,
|
||||
result);
|
||||
|
||||
delete [] types;
|
||||
delete [] sizes;
|
||||
|
||||
return jsonStr;
|
||||
}
|
||||
|
||||
@@ -2013,7 +2070,7 @@ static std::string BuildSingleFileRespJson(const file_dir_req_t *req,
|
||||
|
||||
if (name == NULL || result != 0)
|
||||
{
|
||||
return BuildFileDirRespJsonEx(req, NULL, NULL, NULL, 0, result);
|
||||
return BuildFileDirRespJsonEx(req, NULL, NULL, NULL, 0, result);//异常响应-1
|
||||
}
|
||||
|
||||
names[0] = (char *)name;
|
||||
@@ -2110,6 +2167,43 @@ static void SafePathName(const char* src, char* dst, size_t dstSize)
|
||||
dst[j] = '\0';
|
||||
}
|
||||
|
||||
static void trim_inplace(char *s)
|
||||
{
|
||||
if (!s) return;
|
||||
|
||||
char *p = s;
|
||||
while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n') {
|
||||
++p;
|
||||
}
|
||||
|
||||
if (p != s) {
|
||||
memmove(s, p, strlen(p) + 1);
|
||||
}
|
||||
|
||||
int len = strlen(s);
|
||||
while (len > 0 &&
|
||||
(s[len - 1] == ' ' ||
|
||||
s[len - 1] == '\t' ||
|
||||
s[len - 1] == '\r' ||
|
||||
s[len - 1] == '\n')) {
|
||||
s[len - 1] = '\0';
|
||||
--len;
|
||||
}
|
||||
}
|
||||
|
||||
static std::string get_parent_dir(const char* fullpath)
|
||||
{
|
||||
if (fullpath == NULL) return "/";
|
||||
|
||||
std::string s(fullpath);
|
||||
size_t pos = s.find_last_of('/');
|
||||
|
||||
if (pos == std::string::npos) return "/";
|
||||
if (pos == 0) return "/";
|
||||
|
||||
return s.substr(0, pos + 1);
|
||||
}
|
||||
|
||||
static int BuildTempLocalPath(char* outPath,
|
||||
size_t outSize,
|
||||
chnl_usr_t* chnl_usr,
|
||||
@@ -2153,7 +2247,7 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr,
|
||||
char localpath[512] = {0};
|
||||
if (BuildTempLocalPath(localpath, sizeof(localpath), chnl_usr, req->path) != 0)
|
||||
{
|
||||
DIY_ERRORLOG_CODE("process",0, LOG_CODE_TRANSIENT_COMM,
|
||||
DIY_ERRORLOG_CODE("process",0, LOG_CODE_FILE_CONTROL,
|
||||
"【ERROR】构造本地临时路径失败 devid=%s path=%s",
|
||||
req->devid, req->path);
|
||||
|
||||
@@ -2161,6 +2255,12 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr,
|
||||
return -1;
|
||||
}
|
||||
|
||||
trim_inplace(req->path);
|
||||
trim_inplace(localpath);
|
||||
|
||||
std::cout << "[FILE][GET] localpath=[" << localpath
|
||||
<< "], remote=[" << req->path << "]" << std::endl;
|
||||
|
||||
ST_RET ret = mms_getFile(chnl_usr->net_info,
|
||||
(ST_CHAR*)localpath,
|
||||
(ST_CHAR*)req->path,
|
||||
@@ -2168,11 +2268,11 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr,
|
||||
|
||||
if (ret != SD_SUCCESS)
|
||||
{
|
||||
DIY_ERRORLOG_CODE("process",0, LOG_CODE_TRANSIENT_COMM,
|
||||
DIY_ERRORLOG_CODE("process",0, LOG_CODE_FILE_CONTROL,
|
||||
"【ERROR】装置文件下载失败 devid=%s, rem=%s, ret=0x%X",
|
||||
req->devid, req->path, ret);
|
||||
|
||||
jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, ret);
|
||||
jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -2181,13 +2281,48 @@ static int HandleTypeDownloadAndUpload(chnl_usr_t* chnl_usr,
|
||||
req->devid, req->path, localpath);
|
||||
|
||||
char wavepath[512] = {0};
|
||||
SendFileWeb(WEB_FILEUPLOAD, localpath, req->path, wavepath);
|
||||
// 拼接上传路径
|
||||
char upload_path[1024] = {0};
|
||||
|
||||
snprintf(upload_path,
|
||||
sizeof(upload_path),
|
||||
"/upload/%s%s",
|
||||
req->devid,
|
||||
req->path);
|
||||
|
||||
std::string cloud_dir = get_parent_dir(upload_path);
|
||||
|
||||
std::cout << "[FILE][UPLOAD]"
|
||||
<< " local=[" << localpath << "]"
|
||||
<< ", remoteFile=[" << upload_path << "]"
|
||||
<< ", cloudDir=[" << cloud_dir << "]"
|
||||
<< std::endl;
|
||||
|
||||
SendFileWeb(WEB_FILEUPLOAD, localpath, cloud_dir.c_str(), wavepath,0);
|
||||
|
||||
// 上传失败
|
||||
if (wavepath[0] == 0)
|
||||
{
|
||||
std::cout << "[FILEUPLOAD] upload failed, wavepath empty" << std::endl;
|
||||
|
||||
jsonString = BuildSingleFileRespJson(req,
|
||||
"",
|
||||
"file",
|
||||
1,
|
||||
-1); // 失败
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cout << "[FILEUPLOAD] upload success, wavepath="
|
||||
<< wavepath << std::endl;
|
||||
|
||||
jsonString = BuildSingleFileRespJson(req,
|
||||
wavepath,
|
||||
"file",
|
||||
1,
|
||||
0); // 成功
|
||||
}
|
||||
|
||||
jsonString = BuildSingleFileRespJson(req,
|
||||
(wavepath[0] != 0 ? wavepath : req->path),
|
||||
"file",
|
||||
1,
|
||||
0);
|
||||
remove(localpath);
|
||||
|
||||
return 0;
|
||||
@@ -2238,7 +2373,7 @@ static int HandleTypeTransferToDevice(chnl_usr_t* chnl_usr,
|
||||
"【ERROR】文件传送到装置失败 devid=%s, src=%s, dest=%s, ret=0x%X",
|
||||
req->devid, localpath, req->path, ret);
|
||||
|
||||
jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, ret);
|
||||
jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -2258,20 +2393,47 @@ static int HandleTypeTransferToDevice(chnl_usr_t* chnl_usr,
|
||||
|
||||
void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr)
|
||||
{
|
||||
if (chnl_usr == NULL || chnl_usr->chnl == NULL || chnl_usr->chnl->ied == NULL)
|
||||
if (chnl_usr == NULL) {
|
||||
std::cout << "[FILEDIR] chnl_usr is NULL" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
if (chnl_usr->chnl == NULL) {
|
||||
std::cout << "[FILEDIR] chnl is NULL" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
if (chnl_usr->chnl->ied == NULL) {
|
||||
std::cout << "[FILEDIR] ied is NULL" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
ied_t *ied = chnl_usr->chnl->ied;
|
||||
ied_usr_t *ied_usr = GET_IEDEXT_ADDR(ied);
|
||||
if (ied_usr == NULL)
|
||||
if (ied_usr == NULL) {
|
||||
std::cout << "[FILEDIR] ied_usr is NULL" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
if (ied_usr->terminal_id[0] == 0)
|
||||
if (ied_usr->terminal_id[0] == 0) {
|
||||
std::cout << "[FILEDIR] terminal_id empty" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
std::cout << "[FILEDIR] checking req for terminal_id="
|
||||
<< ied_usr->terminal_id << std::endl;
|
||||
|
||||
file_dir_req_t *req = PopMatchedFileDirReq(ied_usr->terminal_id);
|
||||
if (req == NULL)
|
||||
return; // 当前连接没有文件请求
|
||||
if (req == NULL) {
|
||||
std::cout << "[FILEDIR] no matched request, terminal_id="
|
||||
<< ied_usr->terminal_id << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
std::cout << "[FILEDIR] matched request success, terminal_id="
|
||||
<< ied_usr->terminal_id
|
||||
<< ", guid=" << req->guid
|
||||
<< std::endl;
|
||||
|
||||
DIY_INFOLOG_CODE(req->devid,1, LOG_CODE_FILE_CONTROL,
|
||||
"【NORMAL】处理文件请求 terminal_id=%s type=%d path=%s",
|
||||
@@ -2320,14 +2482,14 @@ void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr)
|
||||
"【WARN】未知文件请求类型 type=%d devid=%s path=%s",
|
||||
req->type, req->devid, req->path);
|
||||
|
||||
jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, 1);//1是失败
|
||||
jsonString = BuildSingleFileRespJson(req, NULL, "file", 1, -1);//-1是失败
|
||||
handleRet = -1;
|
||||
}
|
||||
|
||||
/* 统一回 Kafka */
|
||||
Ckafka_data_t dir_info;
|
||||
dir_info.strTopic = QString::fromStdString(Topic_Reply_Topic);
|
||||
dir_info.mp_id = QString::fromLocal8Bit(req->guid);
|
||||
dir_info.strTopic = QString::fromStdString(G_REPLY_TOPIC_FILE);
|
||||
dir_info.mp_id = QString::fromLocal8Bit(req->devid);
|
||||
dir_info.strText = QString::fromStdString(jsonString);
|
||||
|
||||
kafka_data_list_mutex.lock();
|
||||
@@ -2790,14 +2952,35 @@ ConsumeStatus myMessageCallbackfile(
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
cJSON* outer = cJSON_Parse(body.c_str());
|
||||
if (!outer) {
|
||||
std::cerr << "parse outer json failed" << std::endl;
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
cJSON* messageBody = cJSON_GetObjectItem(outer, "messageBody");
|
||||
|
||||
if (!messageBody ||
|
||||
messageBody->type != cJSON_String ||
|
||||
messageBody->valuestring == NULL)
|
||||
{
|
||||
std::cerr << "messageBody invalid" << std::endl;
|
||||
cJSON_Delete(outer);
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
std::string realBody = messageBody->valuestring;
|
||||
|
||||
cJSON_Delete(outer);
|
||||
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_FILE_CONTROL,"【NORMAL】前置消费topic:%s_%s的文件控制消息",
|
||||
FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_FILE.c_str());
|
||||
|
||||
std::cout << "file Callback received message: " << body << std::endl;
|
||||
std::cout << "file Callback received message: " << realBody << std::endl;
|
||||
std::cout << "Message Key: " << (key.empty() ? "N/A" : key) << std::endl;
|
||||
|
||||
file_dir_req_t req;
|
||||
if (ParseFileDirReq(body.c_str(), &req) != 0)
|
||||
if (ParseFileDirReq(realBody.c_str(), &req) != 0)
|
||||
{
|
||||
DIY_WARNLOG_CODE(req.devid,1,LOG_CODE_FILE_CONTROL,"【WARN】文件控制消息解析失败: %s", body);
|
||||
//return E_RECONSUME_LATER;
|
||||
|
||||
@@ -1531,6 +1531,7 @@ void CheckAllConnectedChannel()
|
||||
if(chnl_usr->m_state == CHANNEL_CONNECTED)
|
||||
{
|
||||
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
|
||||
printf("[FILEDIR] enter HandleFileDirReqForChannel");
|
||||
HandleFileDirReqForChannel(chnl_usr);//文件目录请求
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user