recall modify

This commit is contained in:
lnk
2025-10-22 15:57:50 +08:00
parent c7e7cd9078
commit a755c6faab
6 changed files with 369 additions and 80 deletions

View File

@@ -127,7 +127,8 @@
"cinttypes": "cpp",
"typeinfo": "cpp",
"valarray": "cpp",
"variant": "cpp"
"variant": "cpp",
"unordered_set": "cpp"
},
"cmake.sourceDirectory": "D:/canneng/云前置移植项目/zw/Linux_Front1056/LFtid1056/lib/libuv-v1.51.0"
}

View File

@@ -21,6 +21,7 @@
#include <sys/stat.h>
#include <fnmatch.h>
#include <memory>
#include <unordered_set>
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -199,6 +200,7 @@ std::string G_ROCKETMQ_KEY_TEST = "";//key
int G_TEST_FLAG = 0;
int G_TEST_NUM = 0;
int G_TEST_TYPE = 0;
int LEDGER_MAX_ITEMS = 5; //台账打印最大项数限制
int TEST_PORT = 11000; //用于当前进程登录测试shell的端口
std::string G_TEST_LIST = ""; //测试用的发送实际数据的终端列表
std::vector<std::string> TESTARRAY; //解析的列表数组
@@ -208,7 +210,7 @@ std::vector<std::string> TESTARRAY; //解析的列表数组
bool enqueue_direct_download(const std::string& dev_id,
const std::string& monitor_id,
const std::string& filename,
const std::vector<std::string>& dir_candidates);
const std::string& guid);
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////当前文件函数声明
@@ -1064,35 +1066,68 @@ void create_recall_xml()
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////补招部分
// 工具函数:将时间字符串转为 time_t秒级
// ▲新增:从 monitorId 提取结尾数字(不含前导非数字部分),失败返回空串
static std::string extract_monitor_digits(const std::string& monitorId) {
// 例: "00B78D0171091" -> "171091"(按你的“对应的数字”的规则可定制)
// 这里实现:取 monitorId 中最后一段连续数字
std::string digits;
for (int i = static_cast<int>(monitorId.size()) - 1; i >= 0; --i) {
if (std::isdigit(static_cast<unsigned char>(monitorId[i]))) {
digits.push_back(monitorId[i]);
} else if (!digits.empty()) {
break;
static std::string get_monitor_digits_from_terminal_list(const std::string& dev_id,
const std::string& monitor_id)
{
std::lock_guard<std::mutex> lk(ledgermtx);
// 找终端
const terminal_dev* dev = NULL;
for (std::vector<terminal_dev>::const_iterator it = terminal_devlist.begin();
it != terminal_devlist.end(); ++it)
{
if (it->terminal_id == dev_id) { dev = &(*it); break; }
}
if (!dev) {
std::cout << "[digits] dev not found: " << dev_id << std::endl;
return std::string();
}
// 找监测点
for (std::vector<ledger_monitor>::const_iterator itLm = dev->line.begin();
itLm != dev->line.end(); ++itLm)
{
if (!itLm->monitor_id.empty() && itLm->monitor_id == monitor_id) {
// 常见就是 logical_device_seq比如 "1"、"02" 等
std::string seq = itLm->logical_device_seq;
// 可选:去掉前导 0与您生成“数字_时间.xxx”的命名规则保持一致
// 若不需要去零,注释以下 5 行即可。
size_t p = 0;
while (p < seq.size() && seq[p] == '0') ++p;
seq = (p >= seq.size()) ? "0" : seq.substr(p);
return seq;
}
}
std::reverse(digits.begin(), digits.end());
return digits;
std::cout << "[digits] monitor not found in dev: mon=" << monitor_id
<< " dev=" << dev_id << std::endl;
return std::string();
}
// ▲新增:把 "YYYY-MM-DD HH:MM:SS[.ffffff]" -> "YYYYMMDDHHMMSS"(忽略小数部分)
// ▲新增:把 "YYYY-MM-DD HH:MM:SS[.ffffff]" -> "YYYYMMDD_HHMMSS_mmm"
static std::string compact_ts_for_filename(const std::string& ts) {
// 允许 "2025-10-10 14:38:07.000000"
// 输出 "20251010143807"
std::string ymdhms;
ymdhms.reserve(14);
for (size_t i = 0; i < ts.size(); ++i) {
char c = ts[i];
if (std::isdigit(static_cast<unsigned char>(c))) {
ymdhms.push_back(c);
if (ymdhms.size() == 14) break;
// 允许输入 "2025-09-09 07:46:57.246000"
// 输出 "20250909_074657_246"
int year, mon, day, hour, min, sec, ms = 0;
char dotpart[16] = {0};
if (sscanf(ts.c_str(), "%d-%d-%d %d:%d:%d.%15s",
&year, &mon, &day, &hour, &min, &sec, dotpart) >= 6)
{
// 提取前三位毫秒
if (dotpart[0]) {
std::string frac(dotpart);
while (frac.size() < 3) frac.push_back('0'); // 不足3补0
ms = std::atoi(frac.substr(0, 3).c_str());
}
char buf[32];
snprintf(buf, sizeof(buf), "%04d%02d%02d_%02d%02d%02d_%03d",
year, mon, day, hour, min, sec, ms);
return std::string(buf);
}
return (ymdhms.size() == 14) ? ymdhms : std::string();
return "";
}
// ▲新增按“数字_时间后缀.后缀”拼直下文件名(返回{*.cfg, *.dat}两种)
@@ -1101,8 +1136,7 @@ static std::vector<std::string> build_direct_filenames(const std::string& monito
{
std::vector<std::string> out;
if (monitorDigits.empty() || ts_compact.empty()) return out;
out.push_back(monitorDigits + "_" + ts_compact + ".cfg");
out.push_back(monitorDigits + "_" + ts_compact + ".dat");
out.push_back(monitorDigits + "_" + ts_compact);
return out;
}
@@ -1164,7 +1198,7 @@ int recall_json_handle_from_mq(const std::string& body)
std::cerr << "Error parsing JSON: " << e.what() << std::endl;
// ★与原逻辑等价:无法解析,不再进入 recall_json_handle
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
g_front_seg_index, G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
return 10004;
}
@@ -1172,14 +1206,14 @@ int recall_json_handle_from_mq(const std::string& body)
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
std::cerr << "'messageBody' is missing or is not a string" << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,没有messageBody字段",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
g_front_seg_index, G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
return 10004;
}
std::string messageBodyStr = root["messageBody"].get<std::string>();
if (messageBodyStr.empty()) {
std::cerr << "'messageBody' is empty" << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,messageBody为空",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
g_front_seg_index, G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
return 10004;
}
@@ -1190,7 +1224,7 @@ int recall_json_handle_from_mq(const std::string& body)
} catch (const std::exception& e) {
std::cerr << "Failed to parse 'messageBody' JSON: " << e.what() << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,messageBody的json结构不正确",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
g_front_seg_index, G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
return 10004;
}
@@ -1204,25 +1238,44 @@ int recall_json_handle_from_mq(const std::string& body)
std::string terminalId = rec.value("terminalId", "");
if (terminalId.empty()) continue;
// ▲dataType 可能是字符串"0"/"1"或数字2
// ▲dataTypestring "0/1/2" 或 number 0/1/2先判断 contains
int dt = -1;
if (rec["dataType"].is_number_integer()) {
dt = rec["dataType"].get<int>();
} else if (rec["dataType"].is_string()) {
std::string s = rec["dataType"].get<std::string>();
if (s == "0") dt = 0; else if (s == "1") dt = 1;
if (rec.contains("dataType")) {
if (rec["dataType"].is_number_integer()) {
dt = rec["dataType"].get<int>();
} else if (rec["dataType"].is_string()) {
std::string s = rec["dataType"].get<std::string>();
if (s == "0") dt = 0;
else if (s == "1") dt = 1;
else if (s == "2") dt = 2;
}
}
if (dt == -1) {
std::cout << "[recall] skip: invalid dataType, guid=" << guid << "\n";
continue;
}
// 统一 monitorId 为数组形式
// === 统一收集监测点:支持 monitorIdList 或 monitorId ===
std::vector<std::string> monitors;
if (rec.contains("monitorId")) {
if (rec.contains("monitorIdList") && rec["monitorIdList"].is_array()) {
for (const auto& m : rec["monitorIdList"]) {
if (m.is_string()) monitors.push_back(m.get<std::string>());
}
}
if (monitors.empty() && rec.contains("monitorId")) {
if (rec["monitorId"].is_array()) {
for (auto& m : rec["monitorId"]) if (m.is_string()) monitors.push_back(m.get<std::string>());
for (const auto& m : rec["monitorId"]) {
if (m.is_string()) monitors.push_back(m.get<std::string>());
}
} else if (rec["monitorId"].is_string()) {
monitors.push_back(rec["monitorId"].get<std::string>());
}
}
if (monitors.empty()) continue;
if (monitors.empty()) {
std::cout << "[recall] skip: monitors empty (no monitorIdList/monitorId), guid=" << guid << "\n";
continue;
}
// ▲沿用:校验终端归属 + 在线性
{
@@ -1235,18 +1288,18 @@ int recall_json_handle_from_mq(const std::string& body)
if (ClientManager::instance().get_dev_status(targetDev->terminal_id) != 1) {
std::cout << "terminalId对应装置不在线: " << targetDev->terminal_id << std::endl;
std::string msg = std::string("装置:") + targetDev->terminal_name + " 不在线,无法补招";
send_reply_to_kafka_recall("12345", "2", static_cast<int>(ResponseCode::INTERNAL_ERROR), msg, targetDev->terminal_id, "", "", "");
send_reply_to_kafka_recall(guid, dt, static_cast<int>(ResponseCode::INTERNAL_ERROR), msg, targetDev->terminal_id, "", "", "");
continue;
}
}
if (dt == 2) {
if (dt == 2) { //一个测点一个guid对应多个文件
// ▲直下文件timeList -> fun1/fun2 -> enqueue_direct_download
if (!rec.contains("timeList") || !rec["timeList"].is_array()) continue;
for (const auto& monId : monitors) {
// fun1提取 monitor 数字
std::string digits = extract_monitor_digits(monId);
std::string digits = get_monitor_digits_from_terminal_list(terminalId, monId);//有锁
if (digits.empty()) { std::cout << "monitorId数字提取失败: " << monId << std::endl; continue; }
for (const auto& t : rec["timeList"]) {
@@ -1256,19 +1309,16 @@ int recall_json_handle_from_mq(const std::string& body)
// fun2生成 *.cfg/*.dat 两个文件名
std::vector<std::string> fns = build_direct_filenames(digits, ts_compact);
// 加入候选目录(使用 RecallFile 缺省的四个;如需定制可在此传自定义列表)
static const std::vector<std::string> DIRS {
"/cf/COMTRADE", "/bd0/COMTRADE", "/sd0/COMTRADE", "/sd0:1/COMTRADE"
};
for (const auto& fn : fns) {
bool ok = enqueue_direct_download(terminalId, monId, fn, DIRS);
bool ok = enqueue_direct_download(terminalId, monId, fn, guid);//有锁
std::cout << "[direct] enqueue " << (ok ? "ok " : "fail ")
<< "dev=" << terminalId << " mon=" << monId
<< " file=" << fn << std::endl;
}
}
}
} else if (dt == 0 || dt == 1) {
} else if (dt == 0 || dt == 1) { //一个装置对应一个guid对应多个监测点的多个时间段
// ▲保持老逻辑(与“对象+data”一致timeInterval 数组
if (!rec.contains("timeInterval") || !rec["timeInterval"].is_array()) continue;
@@ -1300,6 +1350,7 @@ int recall_json_handle_from_mq(const std::string& body)
std::string end = s.substr(pos + 1);
RecallFile rm_all;
rm_all.recall_guid = guid;
rm_all.recall_status = 0;
rm_all.StartTime = start;
rm_all.EndTime = end;
@@ -3952,16 +4003,16 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
//////////////////////////////////////////////////////////////////////////////////////////////////////////////处理补招逻辑
//发送补招响应给web
void send_reply_to_kafka_recall(const std::string& guid, const std::string& step,int code, const std::string& result,const std::string& terminalId,const std::string& lineIndex,const std::string& recallStartDate,const std::string& recallEndDate){
void send_reply_to_kafka_recall(const std::string& guid, int dataType,int code, const std::string& result,const std::string& terminalId,const std::string& monitorId,const std::string& recallStartDate,const std::string& recallEndDate){
// 构造 JSON 字符串
std::ostringstream oss;
oss << "{"
<< "\"guid\":\"" << guid << "\","
<< "\"step\":\"" << step << "\","
<< "\"dataType\":\"" << dataType << "\","
<< "\"code\":" << code << ","
<< "\"result\":\"" << result << "\","
<< "\"terminalId\":\"" << terminalId << "\","
<< "\"lineIndex\":\"" << lineIndex << "\","
<< "\"monitorId\":\"" << monitorId << "\","
<< "\"recallStartDate\":\"" << recallStartDate << "\","
<< "\"recallEndDate\":\"" << recallEndDate << "\","
<< "\"processNo\":\"" << g_front_seg_index << "\","
@@ -4013,7 +4064,7 @@ void check_recall_event() {
+ " 补招时间范围:" + front.StartTime
+ " ~ " + front.EndTime
+ " 补招执行完成";
send_reply_to_kafka_recall("12345","2",static_cast<int>(ResponseCode::OK),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime);
send_reply_to_kafka_recall("12345",1,static_cast<int>(ResponseCode::OK),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime);
lm.recall_list.pop_front(); // 弹掉首条
} else if (front.recall_status == static_cast<int>(RecallStatus::FAILED)) {
@@ -4026,7 +4077,7 @@ void check_recall_event() {
+ " 补招时间范围:" + front.StartTime
+ " ~ " + front.EndTime
+ " 补招执行失败";
send_reply_to_kafka_recall("12345","2",static_cast<int>(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime);
send_reply_to_kafka_recall("12345",1,static_cast<int>(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.logical_device_seq,front.StartTime,front.EndTime);
lm.recall_list.pop_front(); // 弹掉首条
} else {
@@ -4046,15 +4097,14 @@ void check_recall_event() {
dev.busytimecount = 0; // 计时归零
continue;
}
//如果是idle又没有待补招任务,应该跳过
else if(!any_non_empty && dev.busytype == static_cast<int>(DeviceState::IDLE)){
//如果没有待补招任务,或者正在进行其他业务,应该跳过
else if(!any_non_empty || (dev.busytype != static_cast<int>(DeviceState::IDLE) && dev.busytype != static_cast<int>(DeviceState::READING_EVENTLOG))){
continue;
}
else{//有待补招任务且处于补招状态或者idle状态
// 继续补招处理
}
// 2) 若任一 monitor 的首条为 RUNNING则该终端正在补招中 -> 跳过该终端,不会下发新的补招请求
//有待补招任务且处于补招事件状态或者idle状态继续补招处理
// 2) 若该装置任一 monitor 的首条为 RUNNING则该终端正在补招中 -> 跳过该终端,不会下发新的补招请求
bool has_running = false;
for (auto& lm : dev.line) {
if (!lm.recall_list.empty() &&
@@ -4063,7 +4113,9 @@ void check_recall_event() {
break;
}
}
if (has_running) continue;
if (has_running) continue;//跳过这个装置
// 若无 RUNNING则说明该终端空闲可以挑选新的补招任务
// 3) 选择该终端的“第一条 NOT_STARTED(0)”作为本终端本轮任务
bool picked = false;
@@ -4071,7 +4123,7 @@ void check_recall_event() {
if (lm.recall_list.empty()) continue; //跳过空的监测点
RecallMonitor& front = lm.recall_list.front(); //取非空测点的列表的第一条
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) {
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) {//未补招
// 标记为 RUNNING并设置终端忙状态
front.recall_status = static_cast<int>(RecallStatus::RUNNING);//该补招记录刷新为补招中
@@ -4079,12 +4131,12 @@ void check_recall_event() {
dev.busytype = static_cast<int>(DeviceState::READING_EVENTLOG);//装置状态正在补招和idle的都刷新为正在补招
dev.busytimecount = 0; //刷新业务超时计数
// 记录任务(每终端只取这一条)
// 记录任务(每终端只取这一条,多个装置可以同时进行
tasks.push_back(RecallTask{
dev.terminal_id,
front.StartTime,
front.EndTime,
lm.monitor_id
lm.logical_device_seq//记录测点号
});
picked = true; //该装置已取
break;
@@ -4197,6 +4249,57 @@ static bool extract_epoch_from_filename(const std::string& name,
return true;
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 从文件名中解析出 "监测点号_YYYYMMDD_HHMMSS_mmm";成功返回 true
static bool make_target_key_from_filename(const std::string& fname, std::string& out_key) {
// 例PQMonitor_PQM1_000005_20250909_074657_246.dat
// 例PQ_PQLD1_000005_20250909_074656_435.dat
// 以 '_' 切分,应至少 6 段
std::vector<std::string> parts;
parts.reserve(8);
size_t start = 0, pos;
while ((pos = fname.find('_', start)) != std::string::npos) {
parts.emplace_back(fname.substr(start, pos - start));
start = pos + 1;
}
parts.emplace_back(fname.substr(start));
if (parts.size() < 6) return false;
// 索引意义:
// [0]=前缀(如 PQMonitor / PQ)
// [1]=监测点号(如 PQM1 / PQLD1)
// [2]=序号(如 000005)
// [3]=YYYYMMDD
// [4]=HHMMSS
// [5]=mmm(.dat)
const std::string& monitor = parts[1];
const std::string& ymd = parts[3];
const std::string& hms = parts[4];
// 去掉末段的扩展名(如 "246.dat" -> "246"
std::string mmm = parts[5];
size_t dot = mmm.rfind('.');
if (dot != std::string::npos) mmm.erase(dot);
// 基本合法性校验(长度与全数字)
auto all_digits = [](const std::string& s) {
return !s.empty() && std::find_if(s.begin(), s.end(),
[](unsigned char c){ return !std::isdigit(c); }) == s.end();
};
if (ymd.size() != 8 || !all_digits(ymd)) return false;
if (hms.size() != 6 || !all_digits(hms)) return false;
if (mmm.size() != 3 || !all_digits(mmm)) return false;
if (monitor.empty()) return false;
// 目标 key监测点号_YYYYMMDD_HHMMSS_mmm
out_key.reserve(monitor.size() + 1 + 8 + 1 + 6 + 1 + 3);
out_key.clear();
out_key.append(monitor).append("_").append(ymd).append("_").append(hms).append("_").append(mmm);
return true;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ====== ★修改check_recall_stat —— 加入“两步法”状态机 ======
void check_recall_file() {
@@ -4339,13 +4442,21 @@ void check_recall_file() {
auto it = front.dir_files.find(front.cur_dir);
if (it != front.dir_files.end()) {
if (front.direct_mode) {
// ▲直下:支持“目标名列表”
std::set<std::string> want(front.target_filenames.begin(), front.target_filenames.end());
// ▲直下:支持“目标名列表”元素形如监测点号_YYYYMMDD_HHMMSS_mmm
std::unordered_set<std::string> want(front.target_filetimes.begin(), front.target_filetimes.end());
for (const auto& ent : it->second) {
if (ent.flag != 1) continue; // 只要文件
size_t n = ::strnlen(ent.name, sizeof(ent.name));
std::string fname(ent.name, n);
if (want.find(fname) != want.end()) {
std::string key; // 解析得到的 "监测点号_YYYYMMDD_HHMMSS_mmm"
if (!make_target_key_from_filename(fname, key)) {
continue; // 不符合命名规范,跳过
}
if (want.find(key) != want.end()) {
front.download_queue.push_back(front.cur_dir + "/" + fname);
}
}
@@ -4507,8 +4618,8 @@ void check_recall_file() {
// 处理指令部分将文件名拼接出来调用这个函数
bool enqueue_direct_download(const std::string& dev_id,
const std::string& monitor_id,
const std::string& filename,
const std::vector<std::string>& dir_candidates)
const std::string& filetime,
const std::string& guid)
{
std::lock_guard<std::mutex> lk(ledgermtx);
@@ -4524,12 +4635,13 @@ bool enqueue_direct_download(const std::string& dev_id,
// 组装一条 RecallFile
RecallFile rf;
rf.recall_guid = guid;
rf.recall_status = static_cast<int>(RecallStatus::NOT_STARTED);
rf.StartTime = "1970-01-01 00:00:00"; // 仅占位,直下文件不会用到时间窗
rf.EndTime = "1970-01-01 00:00:01";
rf.dir_candidates = dir_candidates; // 传入要检索的目录列表
//rf.dir_candidates = dir_candidates; // 要检索的目录列表和默认的一致
rf.direct_mode = true; // ★关键:直下文件
rf.target_filenames.push_back(filename); // ▲单个文件入“列表”
rf.target_filetimes.push_back(filetime); // ▲单个文件时间入“列表”
lm_it->recall_list_static.push_back(std::move(rf));
@@ -4928,7 +5040,9 @@ bool append_qvvr_event(const std::string& terminal_id,
<< " phase=" << phase
<< std::endl;
{
std::lock_guard<std::mutex> lk(ledgermtx);
std::cout << "[append_qvvr_event] lock acquired. terminal_devlist.size="
<< terminal_devlist.size() << std::endl;
@@ -5003,10 +5117,12 @@ bool append_qvvr_event(const std::string& terminal_id,
std::cout << "[append_qvvr_event] done(update)."
<< std::endl;
return true;
return true; //更新完毕
}
}
//新的事件
// 4) 复用空槽used_status=false
for (size_t i = 0; i < qe.qvvrdata.size(); ++i) {
auto& q = qe.qvvrdata[i];
@@ -5028,7 +5144,7 @@ bool append_qvvr_event(const std::string& terminal_id,
std::cout << "[append_qvvr_event] done(reuse)."
<< std::endl;
return true;
return true; //复用完毕
}
}
@@ -5052,6 +5168,8 @@ bool append_qvvr_event(const std::string& terminal_id,
<< ", phase=" << phase
<< "}" << std::endl;
}
std::cout << "[append_qvvr_event] done(push_back)."
<< std::endl;
return true;

View File

@@ -68,6 +68,7 @@ enum class ActionResult {
class RecallFile
{
public:
std::string recall_guid; // 本次补招的唯一标识 GUID
int recall_status; // 补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败
std::string StartTime; // 数据补招起始时间yyyy-MM-dd HH:mm:ss
std::string EndTime; // 数据补招结束时间yyyy-MM-dd HH:mm:ss
@@ -76,7 +77,7 @@ public:
//暂态文件用
bool direct_mode = false; // 直下文件开关true 表示不按时间窗,仅按目标文件名
std::vector<std::string> target_filenames; // 直下文件名(不含目录)
std::vector<std::string> target_filetimes; // 直下文件匹配时间
std::list<std::string> file_paths; // 已下载/要上报的完整路径(用于最终结果)
@@ -118,7 +119,7 @@ public:
// ★新增:按需保留直下文件开关和目标名
if (!keep_direct) {
direct_mode = false;
target_filenames.clear(); // ▲列表清空
target_filetimes.clear(); // ▲列表清空
}
}
};
@@ -724,7 +725,7 @@ void check_recall_event();
void check_recall_file();
//补招响应
void send_reply_to_kafka_recall(const std::string& guid, const std::string& step,int code, const std::string& result,const std::string& terminalId,const std::string& lineIndex,const std::string& recallStartDate,const std::string& recallEndDate);
void send_reply_to_kafka_recall(const std::string& guid, int step,int code, const std::string& result,const std::string& terminalId,const std::string& lineIndex,const std::string& recallStartDate,const std::string& recallEndDate);
//缓存目录信息
void filemenu_cache_put(const std::string& dev_id,

View File

@@ -55,6 +55,7 @@ extern std::string subdir;
extern int G_TEST_NUM;
extern int G_TEST_TYPE;
extern int LEDGER_MAX_ITEMS;
extern bool errorOutputEnabled;
extern bool warnOutputEnabled;
@@ -259,6 +260,11 @@ extern bool normalOutputEnabled;
G_TEST_TYPE = type;
}
void Worker::setMaxItems(int items) {
std::lock_guard<std::mutex> locker(testMutex);
LEDGER_MAX_ITEMS = items;
}
// 日志控制
void Worker::setTestlog(bool flag) {
redirectErrorOutput(flag);
@@ -323,7 +329,12 @@ extern bool normalOutputEnabled;
int flag = std::atoi(cmd.substr(4).c_str());
setTestlog(flag);
sendStr(clientFD, "\r\x1B[KLOG updated\r\n");
} else if (cmd == "rc") {
}else if (cmd.find("MAX=") == 0) {
int flag = std::atoi(cmd.substr(4).c_str());
setMaxItems(flag);
sendStr(clientFD, "\r\x1B[KMAX_ITEMS updated\r\n");
}
else if (cmd == "rc") {
rocketmq_test_rc(m_front);
sendStr(clientFD, "\r\x1B[KExecuted rocketmq_test_rc\r\n");
} else if (cmd == "getdir") {
@@ -410,7 +421,7 @@ extern bool normalOutputEnabled;
void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
// —— 显示控制:最多打印的元素数量(防止过长)——
constexpr size_t MAX_ITEMS = 5; // 可按需调整或删除限制
const size_t MAX_ITEMS = static_cast<size_t>(LEDGER_MAX_ITEMS); // 非 constexpr
std::ostringstream os;
os << "\r\x1B[K------------------------------------\n";
@@ -602,6 +613,152 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
os << "\r\x1B[K |.. (+" << (n - MAX_ITEMS) << " more)\n";
}
}
// ========================= ★新增:补招打印 =========================
// ★新增:小工具—把状态/阶段枚举转成可读字符串
auto recallStatusStr = [](int st) -> const char* {
switch (st) {
case 0: return "NOT_STARTED(0)";
case 1: return "RUNNING(1)";
case 2: return "DONE(2)";
case 3: return "FAILED(3)";
default: return "UNKNOWN";
}
};
auto phaseStr = [](RecallPhase p) -> const char* {
switch (p) {
case RecallPhase::IDLE: return "IDLE";
case RecallPhase::LISTING: return "LISTING";
case RecallPhase::DOWNLOADING: return "DOWNLOADING";
}
return "UNKNOWN";
};
auto resultStr = [](ActionResult r) -> const char* {
switch (r) {
case ActionResult::PENDING: return "PENDING";
case ActionResult::FAIL: return "FAIL";
case ActionResult::OK: return "OK";
}
return "UNKNOWN";
};
// --- ★新增事件补招RecallMonitor ---
os << "\r\x1B[K |-- Recall(Event) (" << ld.recall_list.size() << "):\n";
{
size_t idx = 0;
for (const auto& r : ld.recall_list) {
if (idx++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- [" << (idx-1) << "] "
<< "status=" << recallStatusStr(r.recall_status)
<< ", StartTime=" << r.StartTime
<< ", EndTime=" << r.EndTime
<< ", STEADY=" << r.STEADY
<< ", VOLTAGE=" << r.VOLTAGE
<< "\n";
}
if (ld.recall_list.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (ld.recall_list.size() - MAX_ITEMS) << " more)\n";
}
}
// --- ★新增稳态补招RecallFile+ 状态机信息 ---
os << "\r\x1B[K |-- Recall(Static Files) (" << ld.recall_list_static.size() << "):\n";
{
size_t idx = 0;
for (const auto& rf : ld.recall_list_static) {
if (idx++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- [" << (idx-1) << "] "
<< "status=" << recallStatusStr(rf.recall_status)
<< ", StartTime=" << rf.StartTime
<< ", EndTime=" << rf.EndTime
<< ", STEADY=" << rf.STEADY
<< ", VOLTAGE=" << rf.VOLTAGE
<< "\n";
// ★新增:直下模式与目标时间列表
os << "\r\x1B[K |-- direct_mode=" << (rf.direct_mode ? "true" : "false")
<< ", target_filetimes(" << rf.target_filetimes.size() << ")\n";
{
size_t c = 0;
for (const auto& t : rf.target_filetimes) {
if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- " << t << "\n";
}
if (rf.target_filetimes.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.target_filetimes.size() - MAX_ITEMS) << " more)\n";
}
}
// ★新增:状态机运行态
os << "\r\x1B[K |-- phase=" << phaseStr(rf.phase)
<< ", cur_dir_index=" << rf.cur_dir_index
<< ", cur_dir=" << rf.cur_dir << "\n";
os << "\r\x1B[K |-- list_result=" << resultStr(rf.list_result)
<< ", download_result=" << resultStr(rf.download_result) << "\n";
// ★新增:候选目录
os << "\r\x1B[K |-- dir_candidates(" << rf.dir_candidates.size() << ")\n";
{
size_t c = 0;
for (const auto& d : rf.dir_candidates) {
if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- " << d << "\n";
}
if (rf.dir_candidates.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.dir_candidates.size() - MAX_ITEMS) << " more)\n";
}
}
// ★新增:目录 -> 文件名列表(仅概要)
os << "\r\x1B[K |-- dir_files(" << rf.dir_files.size() << " dirs)\n";
{
size_t c = 0;
for (const auto& kv : rf.dir_files) {
if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- [" << (c-1) << "] dir=" << kv.first
<< " files=" << kv.second.size() << "\n";
}
if (rf.dir_files.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.dir_files.size() - MAX_ITEMS) << " more)\n";
}
}
// ★新增:下载队列(概要)
os << "\r\x1B[K |-- download_queue(" << rf.download_queue.size() << ")\n";
{
size_t c = 0;
for (const auto& path : rf.download_queue) {
if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- " << path << "\n";
}
if (rf.download_queue.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.download_queue.size() - MAX_ITEMS) << " more)\n";
}
}
// ★新增:当前下载中文件
if (!rf.downloading_file.empty()) {
os << "\r\x1B[K |-- downloading: " << rf.downloading_file << "\n";
}
// ★新增:已下载/待上报的完整路径file_paths
os << "\r\x1B[K |-- file_paths(" << rf.file_paths.size() << ")\n";
{
size_t c = 0;
for (const auto& p : rf.file_paths) {
if (c++ >= MAX_ITEMS) break;
os << "\r\x1B[K |-- " << p << "\n";
}
if (rf.file_paths.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (rf.file_paths.size() - MAX_ITEMS) << " more)\n";
}
}
}
if (ld.recall_list_static.size() > MAX_ITEMS) {
os << "\r\x1B[K |.. (+" << (ld.recall_list_static.size() - MAX_ITEMS) << " more)\n";
}
}
// ======================= ★新增:补招打印结束 =======================
}
os << "\r\x1B[K------------------------------------\n";

View File

@@ -59,6 +59,7 @@ private:
void sendBytes(int fd, const char* buf, int len);
void setTestNum(int num);
void setTestType(int type);
void setMaxItems(int items);
void setTestlog(bool flag);
void doPeriodicTask();
void processCommand(const std::string &cmd, int clientFD);

View File

@@ -258,6 +258,7 @@ void process_received_message(string mac, string id,const char* data, size_t len
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,
"");
//事件主动上送处理完成,不需要通知状态机
}
else {
// 处理获取失败的情况
@@ -2077,6 +2078,16 @@ void process_received_message(string mac, string id,const char* data, size_t len
<< ", 特征幅值: " << record.fMagntitude << " pu"
<< ", 时间戳: " << record.triggerTimeMs << "ms" << std::endl;
//记录补招上来的暂态事件
append_qvvr_event(id,event.head.name,
record.nType,record.fPersisstime,record.fMagntitude,record.triggerTimeMs,record.phase);
//直接发走暂态事件
transfer_json_qvvr_data(id,event.head.name,
record.fMagntitude,record.fPersisstime,record.triggerTimeMs,record.nType,record.phase,"");
//通知状态机补招暂态事件成功
on_device_response_minimal(static_cast<int>(ResponseCode::OK), id, 0, static_cast<int>(DeviceState::READING_EVENTLOG));
recordlist.push_back(record);
}