From f27d2089596c6a665b919213e201d65a87a03814 Mon Sep 17 00:00:00 2001 From: lnk Date: Mon, 15 Sep 2025 16:36:21 +0800 Subject: [PATCH] add recall reply --- LFtid1056/cloudfront/code/cfg_parser.cpp | 243 +++++++++++++++++------ LFtid1056/cloudfront/code/interface.h | 2 + LFtid1056/cloudfront/code/rocketmq.cpp | 81 ++------ LFtid1056/cloudfront/code/worker.cpp | 80 +++++++- LFtid1056/dealMsg.cpp | 5 +- LFtid1056/main_thread.cpp | 12 +- 6 files changed, 271 insertions(+), 152 deletions(-) diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 7f3cd32..7a30419 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -1092,25 +1092,79 @@ void Get_Recall_Time_Char(const std::string& start_time_str, } //mq调用将补招信息写入补招列表 -int recall_json_handle(const std::string& jstr) { - // 不指定稳态/暂态则全部补招 - int stat = 0; - int voltage = 0; +int recall_json_handle_from_mq(const std::string& body) +{ try { - - // 1. 解析 JSON 数组 - auto json_root = nlohmann::json::parse(jstr); - if (!json_root.is_array()) { - std::cout << "json root解析错误" << std::endl; - return 10000; + // ====== 解析外层 JSON ====== + nlohmann::json root; + try { + root = nlohmann::json::parse(body); + } catch (const std::exception& e) { + std::cerr << "Error parsing JSON: " << e.what() << std::endl; + // ★与原逻辑等价:无法解析,不再进入 recall_json_handle + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + return 10004; } - // 2. 遍历每个补招项 - for (auto& item : json_root) { + // 提取 "messageBody"(字符串) + if (!root.contains("messageBody") || !root["messageBody"].is_string()) { + std::cerr << "'messageBody' is missing or is not a string" << std::endl; + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,没有messageBody字段", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + return 10004; + } + std::string messageBodyStr = root["messageBody"].get(); + if (messageBodyStr.empty()) { + std::cerr << "'messageBody' is empty" << std::endl; + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,messageBody为空", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + return 10004; + } + + // 解析 messageBody 内层 JSON + nlohmann::json messageBody; + try { + messageBody = nlohmann::json::parse(messageBodyStr); + } catch (const std::exception& e) { + std::cerr << "Failed to parse 'messageBody' JSON: " << e.what() << std::endl; + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,messageBody的json结构不正确", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + return 10004; + } + + // 提取 guid 并立即回执 + if (!messageBody.contains("guid") || !messageBody["guid"].is_string()) { + std::cerr << "'guid' is missing or is not a string" << std::endl; + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,没有guid字段", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + return 10004; + } + std::string guid = messageBody["guid"].get(); + send_reply_to_queue(guid, "1", "收到补招指令"); + + // 提取 data 数组 + if (!messageBody.contains("data") || !messageBody["data"].is_array()) { + std::cerr << "'data' is missing or is not an array" << std::endl; + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,没有data字段", + g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); + return 10004; + } + // 仅用于保留你原先的调试输出 + std::string data_dump; + try { data_dump = messageBody["data"].dump(); } catch (...) { data_dump.clear(); } + std::cout << "parseJsonMessageRC: " << data_dump << std::endl; + + // 不指定稳态/暂态则全部补招 + int stat = 0; + int voltage = 0; + + // 2. 遍历每个补招项(这里直接用已解析的 messageBody["data"]) + for (auto& item : messageBody["data"]) { // 获取必需字段 // ★修改:强制要求 terminalId; if (!item.contains("terminalId") || - !item.contains("monitor") || + !item.contains("monitor") || !item.contains("timeInterval") || !item.contains("dataType")) { @@ -1125,10 +1179,9 @@ int recall_json_handle(const std::string& jstr) { continue; } - // 2.1 解析 dataType + // 2.1 解析 dataType(仅保留稳态/暂态) std::string datatype = item["dataType"].get(); if (!datatype.empty()) { - // ★修改:仅保留稳态/暂态 if (datatype == "0" || datatype == "稳态" || datatype == "steady" || datatype == "STEADY") { stat = 1; voltage = 0; // 稳态 } else if (datatype == "1" || datatype == "暂态" || datatype == "voltage" || datatype == "VOLTAGE") { @@ -1143,11 +1196,12 @@ int recall_json_handle(const std::string& jstr) { // ★新增:定位并校验该 terminal 是否归属当前进程 std::lock_guard lock(ledgermtx); - const terminal_dev* targetDev = nullptr; - for (const auto& dev : terminal_devlist) { - // 只处理本进程对应的终端 - if (dev.terminal_id == terminalId) { - targetDev = &dev; + const terminal_dev* targetDev = NULL; + for (std::vector::const_iterator it = terminal_devlist.begin(); + it != terminal_devlist.end(); ++it) + { + if (it->terminal_id == terminalId) { + targetDev = &(*it); break; } } @@ -1156,8 +1210,18 @@ int recall_json_handle(const std::string& jstr) { continue; } + // 添加判断装置在线,不注册guid,异步补招 + if (ClientManager::instance().get_dev_status(targetDev->terminal_id) != 1) { + std::cout << "terminalId对应装置不在线: " << targetDev->terminal_id << std::endl; + + // 响应 web + std::string msg = std::string("装置:") + targetDev->terminal_name + " 不在线,无法补招"; + send_reply_to_kafka_recall("12345", "2", static_cast(ResponseCode::INTERNAL_ERROR), msg, targetDev->terminal_id, "", "", ""); + continue;//处理下一个装置的补招记录 + } + // ★新增:按新结构解析 monitor 层级 - auto& monitors = item["monitor"]; + nlohmann::json& monitors = item["monitor"]; if (!monitors.is_array() || monitors.empty()) { std::cout << "monitor数组为空或非数组类型" << std::endl; continue; @@ -1168,15 +1232,19 @@ int recall_json_handle(const std::string& jstr) { std::cout << "monitor项缺少 monitorId 或 timeInterval" << std::endl; continue; } - + std::string monitorId = mobj["monitorId"].get(); if (monitorId.empty()) continue; - + // 不只是判断存在,还要拿到指针 lm 以便后续 push_back - ledger_monitor* lm = nullptr; - for (auto& mon : const_cast(targetDev)->line) { - if (!mon.monitor_id.empty() && mon.monitor_id == monitorId) { - lm = &mon; + ledger_monitor* lm = NULL; + // 注意:这里需要非常量指针,取 const_cast 后遍历 + terminal_dev* dev_nc = const_cast(targetDev); + for (std::vector::iterator itLm = dev_nc->line.begin(); + itLm != dev_nc->line.end(); ++itLm) + { + if (!itLm->monitor_id.empty() && itLm->monitor_id == monitorId) { + lm = &(*itLm); break; } } @@ -1185,50 +1253,52 @@ int recall_json_handle(const std::string& jstr) { << " @ " << terminalId << std::endl; continue; } - - auto& tiArr = mobj["timeInterval"]; + + nlohmann::json& tiArr = mobj["timeInterval"]; if (!tiArr.is_array() || tiArr.empty()) { std::cout << "timeInterval为空或非数组类型: monitorId=" << monitorId << std::endl; continue; } - - // 这里拆分时间段并 push 到 lm->recall_list + + // 这里拆分时间段并 push 到 lm->recall_list / lm->recall_list_static for (auto& timeItem : tiArr) { std::string ti = timeItem.get(); - auto pos = ti.find('~'); + std::string::size_type pos = ti.find('~'); if (pos == std::string::npos) { std::cout << "timeInterval格式错误: " << ti << std::endl; continue; } std::string start = ti.substr(0, pos); std::string end = ti.substr(pos + 1); - - // 仅对 recall_list(事件)进行 1 小时拆分;recall_list_stat(稳态)不拆分 + + // 仅对 recall_list(事件)进行 1 小时拆分;recall_list_static(稳态)不拆分 { - // 公共字段(整体区间,不拆分)用于 recall_list_stat - RecallFile rm_all; - rm_all.recall_status = 0; // 初始状态:未补招 - rm_all.StartTime = start; - rm_all.EndTime = end; - rm_all.STEADY = std::to_string(stat); - rm_all.VOLTAGE = std::to_string(voltage); - + // 公共字段(整体区间,不拆分)用于 recall_list_static + RecallFile rm_all; // ★类型正确:稳态列表的元素 + rm_all.recall_status = 0; // 初始状态:未补招 + rm_all.StartTime = start; // 直接使用字符串 + rm_all.EndTime = end; + rm_all.STEADY = std::to_string(stat); + rm_all.VOLTAGE = std::to_string(voltage); + // 仅当需要事件补招(voltage==1)时,才进行 1 小时拆分并压入 recall_list if (voltage == 1) { // 拆分时间段为 1 小时一段,并存入 recall_list std::vector recallinfo_list_hour; Get_Recall_Time_Char(start, end, recallinfo_list_hour); - - for (auto& info : recallinfo_list_hour) { - RecallMonitor rm; - rm.recall_status = 0; // 初始状态:未补招 - rm.StartTime = epoch_to_datetime_str(info.starttime); - rm.EndTime = epoch_to_datetime_str(info.endtime); - rm.STEADY = std::to_string(stat); - rm.VOLTAGE = std::to_string(voltage); - - lm->recall_list.push_back(std::move(rm)); - + + for (std::size_t i = 0; i < recallinfo_list_hour.size(); ++i) { + const RecallInfo& info = recallinfo_list_hour[i]; + + RecallMonitor rm; // ★类型正确:事件列表的元素 + rm.recall_status = 0; // 初始状态:未补招 + rm.StartTime = epoch_to_datetime_str(info.starttime); + rm.EndTime = epoch_to_datetime_str(info.endtime); + rm.STEADY = std::to_string(stat); + rm.VOLTAGE = std::to_string(voltage); + + lm->recall_list.push_back(rm); + // 事件补招列表(recall_list)调试打印 std::cout << "[recall_json_handle] terminal=" << terminalId << " monitor=" << monitorId @@ -1239,12 +1309,12 @@ int recall_json_handle(const std::string& jstr) { << std::endl; } } - + // 仅当需要稳态补招(stat==1)时,不拆分,直接压入 recall_list_static if (stat == 1) { lm->recall_list_static.push_back(rm_all); // 不拆分,整体区间 - - // 稳态补招列表(recall_list_stat)调试打印 + + // 稳态补招列表(recall_list_static)调试打印 std::cout << "[recall_json_handle] terminal=" << terminalId << " monitor=" << monitorId << " [recall_list_static] start=" << lm->recall_list_static.back().StartTime @@ -1253,15 +1323,15 @@ int recall_json_handle(const std::string& jstr) { << " voltage="<< lm->recall_list_static.back().VOLTAGE << std::endl; } - - // 非法输入保护 + + // 非法输入保护(保留你原来的保护与返回码) if (stat == 0 && voltage == 0) { std::cout << "[recall_json_handle] skip: stat=0 && voltage=0, monitor=" << monitorId << " terminal=" << terminalId << " start=" << rm_all.StartTime << " end=" << rm_all.EndTime << std::endl; - return 10003; // 不可能进入这个逻辑,错误退出 + return 10003; } } } @@ -3227,14 +3297,14 @@ int get_type_by_state(int state) { case DeviceState::READING_INTERFIXEDVALUEDES: case DeviceState::READING_CONTROLWORD: case DeviceState::SET_INTERFIXEDVALUE: - return 0x2106; + return 0x2106; //读数据 case DeviceState::READING_FILEMENU: - return 0x2131; + return 0x2131; //读目录 case DeviceState::READING_EVENTFILE: case DeviceState::READING_FILEDATA: - return 0x2132; + return 0x2132; //读文件 default: return 0; // 没有对应的type @@ -3245,6 +3315,7 @@ int get_type_by_state(int state) { void check_device_busy_timeout() { std::lock_guard lock(ledgermtx); + for (auto &dev : terminal_devlist) { if (dev.isbusy != 0) // 有业务在进行 @@ -3260,7 +3331,7 @@ void check_device_busy_timeout() << dev.busytimecount << "s)" << std::endl; //发送超时响应 - send_reply_to_cloud(static_cast(ResponseCode::BAD_REQUEST),dev.terminal_id,get_type_by_state(dev.busytype)); + send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype)); // 超时清空状态 dev.guid.clear(); // 清空进行中的 guid @@ -3276,6 +3347,10 @@ void check_device_busy_timeout() std::cout << "[Timeout] Device " << dev.terminal_id << " busytype=" << dev.busytype << " 超时(" << dev.busytimecount << "s)" << std::endl; + + //发送超时响应 + send_reply_to_cloud(static_cast(ResponseCode::TIMEOUT),dev.terminal_id,get_type_by_state(dev.busytype)); + // 超时清空状态 dev.guid.clear(); dev.busytype = 0; @@ -3821,6 +3896,36 @@ void clear_terminal_runtime_state(const std::string& id) { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑 +//发送补招响应给web +void send_reply_to_kafka_recall(const std::string& guid, const std::string& step,int code, const std::string& result,const std::string& terminalId,const std::string& lineIndex,const std::string& recallStartDate,const std::string& recallEndDate){ + // 构造 JSON 字符串 + std::ostringstream oss; + oss << "{" + << "\"guid\":\"" << guid << "\"," + << "\"step\":\"" << step << "\"," + << "\"code\":" << code << "," + << "\"result\":\"" << result << "\"," + << "\"terminalId\":\"" << terminalId << "\"," + << "\"lineIndex\":\"" << lineIndex << "\"," + << "\"recallStartDate\":\"" << recallStartDate << "\"," + << "\"recallEndDate\":\"" << recallEndDate << "\"," + << "\"processNo\":\"" << g_front_seg_index << "\"," + << "\"nodeId\":\"" << FRONT_INST << "\"" + << "}"; + + std::string jsonString = oss.str(); + + // 封装 Kafka 消息 + queue_data_t connect_info; + connect_info.strTopic = Topic_Reply_Topic; + connect_info.strText = jsonString; + + // 加入发送队列(带互斥锁保护) + queue_data_list_mutex.lock(); + queue_data_list.push_back(connect_info); + queue_data_list_mutex.unlock(); +} + // ===== 一次遍历可下发“多个终端的一条” ===== void check_recall_event() { @@ -3847,7 +3952,11 @@ void check_recall_event() { << " " << front.StartTime << " ~ " << front.EndTime << std::endl; //调用reply接口通知web端该时间段补招完成 - + std::string msg = std::string("监测点:") + lm.monitor_name + + " 补招时间范围:" + front.StartTime + + " ~ " + front.EndTime + + " 补招执行完成"; + send_reply_to_kafka_recall("12345","2",static_cast(ResponseCode::OK),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime); lm.recall_list.pop_front(); // 弹掉首条 } else if (front.recall_status == static_cast(RecallStatus::FAILED)) { @@ -3856,7 +3965,11 @@ void check_recall_event() { << " " << front.StartTime << " ~ " << front.EndTime << std::endl; //调用reply接口通知web端该时间段补招失败 - + std::string msg = std::string("监测点:") + lm.monitor_name + + " 补招时间范围:" + front.StartTime + + " ~ " + front.EndTime + + " 补招执行失败"; + send_reply_to_kafka_recall("12345","2",static_cast(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime); lm.recall_list.pop_front(); // 弹掉首条 } else { diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 4556bf3..818b603 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -654,6 +654,8 @@ void on_device_response_minimal(int response_code, void check_recall_event(); void check_recall_file(); +//补招响应 +void send_reply_to_kafka_recall(const std::string& guid, const std::string& step,int code, const std::string& result,const std::string& terminalId,const std::string& lineIndex,const std::string& recallStartDate,const std::string& recallEndDate); //缓存目录信息 void filemenu_cache_put(const std::string& dev_id, diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 35ed6d4..9f63efb 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -77,7 +77,7 @@ extern std::vector TESTARRAY; ////////////////////////////////////////////////////////////////////////////////////////////////////////外部文件函数声明 extern void execute_bash(std::string fun,int process_num,std::string type); -extern int recall_json_handle(const std::string& jstr); +extern int recall_json_handle_from_mq(const std::string& body); //////////////////////////////////////////////////////////////////////////////////////////////////////本文件函数向前声明 @@ -373,62 +373,6 @@ std::string find_guid_index_from_dev_id(const std::string& dev_id) { /////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理 -std::string parseJsonMessageRC(const std::string& inputJson) { - // 解析输入 JSON 字符串 - json root; - try { - root = json::parse(inputJson); - } catch (const std::exception& e) { - std::cerr << "Error parsing JSON: " << e.what() << std::endl; - return ""; - } - - // 提取 "messageBody" 部分(它是一个字符串) - if (!root.contains("messageBody") || !root["messageBody"].is_string()) { - std::cerr << "'messageBody' is missing or is not a string" << std::endl; - return ""; - } - - std::string messageBodyStr = root["messageBody"].get(); - if (messageBodyStr.empty()) { - std::cerr << "'messageBody' is empty" << std::endl; - return ""; - } - - // 解析 messageBody 中的 JSON 字符串 - json messageBody; - try { - messageBody = json::parse(messageBodyStr); - } catch (const std::exception& e) { - std::cerr << "Failed to parse 'messageBody' JSON: " << e.what() << std::endl; - return ""; - } - - // 提取 "guid" 部分 - if (!messageBody.contains("guid") || !messageBody["guid"].is_string()) { - std::cerr << "'guid' is missing or is not a string" << std::endl; - return ""; - } - std::string guid = messageBody["guid"].get(); - - // 发送 guid 回复 - send_reply_to_queue(guid, "1", "收到补招指令"); - - // 提取 "data" 部分 - if (!messageBody.contains("data") || !messageBody["data"].is_array()) { - std::cerr << "'data' is missing or is not an array" << std::endl; - return ""; - } - - // 返回 "data" 数组的字符串形式 - try { - return messageBody["data"].dump(); // 默认带缩进;如需去除缩进:dump(-1) - } catch (const std::exception& e) { - std::cerr << "Error converting 'data' to string: " << e.what() << std::endl; - return ""; - } -} - bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit){ json root; try { @@ -873,6 +817,17 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms // 加锁访问台账 if( !devid.empty() && line > 0){ //不再使用文件触发方式,直接调用接口向终端发起请求 + + //不注册guid,直接将请求指令下发装置,排队处理 + + //添加在线判断 + if (ClientManager::instance().get_dev_status(devid) != 1) { + std::cout << "devid对应装置不在线: " << devid << std::endl; + // 记录日志不响应 web 端 + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str()); + return rocketmq::CONSUME_SUCCESS; + } + ClientManager::instance().set_real_state_count(devid, 60, line);//一秒询问一次,询问60次,下一次同一个测点调用的话就会刷新 } else{ @@ -1002,17 +957,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms } // 解析 JSON 字符串 - std::string result = parseJsonMessageRC(body); // 使用 std::string 接收解析结果 - std::cout << "parseJsonMessageRC: " << result << std::endl; - - if (!result.empty()) { - - recall_json_handle(result);//不再使用文件补招方式 - - } else { - std::cerr << "recall data is NULL." << std::endl; - DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str()); - } + recall_json_handle_from_mq(body);//不再使用文件补招方式 return rocketmq::CONSUME_SUCCESS; } diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index ce69d66..7aa675b 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -1,5 +1,5 @@ //////////////////////////////////////////////////////////////////////////////////////////////////// - +#include #include #include #include @@ -705,12 +705,54 @@ void Worker::handleViewLogCommand(const std::string& command, int clientFD) { stopViewLog = false; showinshellflag = true; - sendStr(clientFD, "\r\x1B[KViewing logs for level: " + level + " (Press '`' to exit)\r\n> "); + sendStr(clientFD, std::string("\r\x1B[KViewing logs for level: ") + level + " (Press '`' to exit)\r\n> "); char inputBuf[16]; + // --- 新增 begin: 目录创建 + 唯一文件名生成 + 打开文件 --- + // 递归创建目录的小工具(最小实现,按‘/’逐级创建) + auto ensure_dir = [](const std::string& path) -> bool { + if (path.empty()) return false; + std::string cur; + cur.reserve(path.size()); + for (size_t i = 0; i < path.size(); ++i) { + cur.push_back(path[i]); + if (path[i] == '/' && cur.size() > 1) { + if (::access(cur.c_str(), F_OK) != 0) { + if (::mkdir(cur.c_str(), 0755) != 0 && errno != EEXIST) return false; + } + } + } + // 末级(若不以 / 结尾) + if (cur.back() != '/') { + if (::access(cur.c_str(), F_OK) != 0) { + if (::mkdir(cur.c_str(), 0755) != 0 && errno != EEXIST) return false; + } + } + return true; + }; + + const std::string logDir = "/FeProject/dat/log"; + if (!ensure_dir(logDir)) { + sendStr(clientFD, "\r\x1B[KFailed to create log directory: /FeProject/dat/log\r\n> "); + return; + } + + std::string filePath = logDir + "/temp.log"; + int index = 1; + while (::access(filePath.c_str(), F_OK) == 0) { + filePath = logDir + "/temp_" + std::to_string(index++) + ".log"; + } + + std::ofstream logFile(filePath.c_str(), std::ios::out | std::ios::trunc); + if (!logFile.is_open()) { + sendStr(clientFD, "\r\x1B[KFailed to open log file for writing.\r\n> "); + return; + } + // --- 新增 end --- + while (!stopViewLog) { - // 1. 监听 shell 输入退出符号 ` + // 1) 监听 shell 输入退出符号 ` fd_set read_fds; FD_ZERO(&read_fds); FD_SET(clientFD, &read_fds); @@ -722,31 +764,47 @@ void Worker::handleViewLogCommand(const std::string& command, int clientFD) { int activity = select(clientFD + 1, &read_fds, nullptr, nullptr, &timeout); if (activity > 0 && FD_ISSET(clientFD, &read_fds)) { int n = recv(clientFD, inputBuf, sizeof(inputBuf), 0); - if (n > 0 && strchr(inputBuf, '`')) { + if (n > 0 && std::memchr(inputBuf, '`', static_cast(n))) { stopViewLog = true; showinshellflag = false; break; } } - // 2. 输出日志 - std::string logEntry; + // --- 修改 begin: 批量获取日志(swap 全取,减少加锁时间) --- + std::list tempLogs; { std::lock_guard lock(*logMutex); if (!logList->empty()) { - logEntry = logList->front(); - logList->pop_front(); + tempLogs.swap(*logList); // 把 logList 中的内容全取出 } } + // --- 修改 end --- - if (!logEntry.empty()) { - sendStr(clientFD, "\r\x1B[K" + logEntry + "\r\n"); + if (!tempLogs.empty()) { + for (const auto& logEntry : tempLogs) { + if (!logEntry.empty()) { + sendStr(clientFD, std::string("\r\x1B[K") + logEntry + "\r\n"); + + // --- 新增 begin: 写入文件 + 及时落盘 --- + logFile << logEntry << '\n'; + // --- 新增 end --- + } + } + // --- 新增 begin: 刷新文件缓冲,保证实时可见 --- + logFile.flush(); + // --- 新增 end --- } else { std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } - // 3. 打印退出提示 + // 3) 打印退出提示 sendStr(clientFD, "\r\x1B[K\nLog view stopped. Returning to shell.\r\n> "); + + // --- 新增 begin: 关闭文件 --- + logFile.close(); + // --- 新增 end --- } + diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 0075091..a2d6c02 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -431,14 +431,15 @@ void process_received_message(string mac, string id,const char* data, size_t len //} queue_data_t data; - data.monitor_no = 1; // + data.monitor_no = avg_data.name; // data.strTopic = TOPIC_STAT;//ͳtopic data.strText = js; data.mp_id = "test"; // std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); - + std::cout << "Successfully assembled tagPqData for line: " + << avg_data.name << std::endl; // //std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount() // << " floats): " << base64Str << std::endl; diff --git a/LFtid1056/main_thread.cpp b/LFtid1056/main_thread.cpp index 930f0c8..ff5bf4c 100644 --- a/LFtid1056/main_thread.cpp +++ b/LFtid1056/main_thread.cpp @@ -154,24 +154,24 @@ void* client_manager_thread(void* arg) { printf("Started client connections\n"); // - std::vector points1 = { + /*std::vector points1 = { {"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1,"0.38k",0}, {"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1,"0.38k",0} - }; + };*/ //00B78DA800D6 00-B7-8D-01-79-06 00-B7-8D-A8-00-D6 00-B7-8D-01-71-09 00-B7-8D-01-88-7f // װб - std::vector devices = { + /*std::vector devices = { { "D001", "Primary Device", "Model-X", "00-B7-8D-01-79-06", 1, points1,true } - }; + };*/ // 100װ //std::vector test_devices = generate_test_devices(100); //lnk̨˶ȡ豸 - //std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk + std::vector devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk //̨˴ӡ PrintDevices(devices); @@ -302,7 +302,7 @@ int main(int argc ,char** argv) {// std::cerr << "process param error,exit" << std::endl; return 1; } - //init_daemon(); + init_daemon(); srand(time(NULL)); // ʼ // ʼ߳