Compare commits

...

8 Commits

Author SHA1 Message Date
lnk
987dba34dc finish git rebase 2025-10-11 10:31:39 +08:00
lnk
a43afb15c5 fix bash 2025-10-11 10:26:23 +08:00
lnk
c494225b38 fix some error and add some fun 2025-10-11 10:26:23 +08:00
lnk
35231baae7 modify ledger update and rtdata 2025-10-11 10:26:22 +08:00
lnk
caf3c308c1 add ledger update 2025-10-11 10:26:22 +08:00
lnk
5616d9096a add rtdata idx 2025-10-11 10:26:22 +08:00
lnk
a6127f0d28 add rtdata idx 2025-10-11 10:26:18 +08:00
lnk
4fe8aee149 modify logs 2025-10-11 10:23:18 +08:00
13 changed files with 832 additions and 349 deletions

Binary file not shown.

View File

@@ -1,6 +1,6 @@
#!/bin/bash
export CLOUD_PATH=/home/pq/zwproject/LFtid1056
export CLOUD_PATH=/FeProject
if [ -z "$CLOUD_PATH" ]; then
echo "Error: CLOUD_PATH is not set. Please set it first."

View File

@@ -68,7 +68,14 @@ extern XmlConfig xmlcfg2;//角型接线xml节点解析的数据-默认映射文
extern std::list<CTopic*> topicList2; //角型接线发送主题链表
extern std::map<std::string, Xmldata*> xmlinfo_list2;//保存所有型号角形接线对应的icd映射文件解析数据
//////////////////////////////////////////////////////////////////////////////////////////////////
extern time_t ConvertToTimestamp(const tagTime& time);
////////////////////////////////////////////////////////////////////////////////////////////////////
//实时数据时间记录
std::mutex g_last_ts_mtx;
std::unordered_map<std::string, time_t> g_last_ts_by_devid;
static std::mutex g_filemenu_cache_mtx;
std::map<std::string, std::vector<tag_dir_info>> g_filemenu_cache;
@@ -1149,7 +1156,7 @@ int recall_json_handle_from_mq(const std::string& body)
return 10004;
}
std::string guid = messageBody["guid"].get<std::string>();
send_reply_to_queue(guid, "1", "收到补招指令");
//send_reply_to_queue(guid, static_cast<int>(ResponseCode::OK), "收到补招指令");
// 提取 data 数组
if (!messageBody.contains("data") || !messageBody["data"].is_array()) {
@@ -1452,7 +1459,7 @@ std::list<std::string> find_xml_belong_to_this_process()
// 根据 str_tag 将 terminal 添加到对应的数组
void add_terminal_to_trigger_update(trigger_update_xml_t& trigger_update_xml,
const std::string& str_tag,
const terminal_dev& work_terminal) {
const update_dev& work_terminal) {
if (str_tag == "add") {
std::cout << "new ledger!!!!" << std::endl;
trigger_update_xml.new_updates.push_back(work_terminal);
@@ -1469,7 +1476,7 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml,
const std::string& str_tag,
const std::string& data,
const std::string& guid_value) {
terminal_dev work_terminal;
update_dev work_terminal;
work_terminal.guid = guid_value;
tinyxml2::XMLDocument doc;
@@ -1487,31 +1494,32 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml,
work_terminal.terminal_id = get_value("id");
work_terminal.terminal_name = get_value("terminalCode");
work_terminal.org_name = get_value("orgName");
work_terminal.maint_name = get_value("maintName");
work_terminal.station_name = get_value("stationName");
work_terminal.tmnl_factory = get_value("manufacturer");
//work_terminal.org_name = get_value("orgName");
//work_terminal.maint_name = get_value("maintName");
//work_terminal.station_name = get_value("stationName");
//work_terminal.tmnl_factory = get_value("manufacturer");
work_terminal.tmnl_status = get_value("status");
work_terminal.dev_type = get_value("devType");
work_terminal.dev_key = get_value("devKey");
work_terminal.dev_series = get_value("series");
//work_terminal.dev_key = get_value("devKey");
//work_terminal.dev_series = get_value("series");
work_terminal.processNo = get_value("processNo");
work_terminal.addr_str = get_value("ip");
work_terminal.port = get_value("port");
work_terminal.timestamp = get_value("updateTime");
//work_terminal.addr_str = get_value("ip");
//work_terminal.port = get_value("port");
//work_terminal.timestamp = get_value("updateTime");
work_terminal.Righttime = get_value("Righttime");
work_terminal.mac = get_value("mac");
for (tinyxml2::XMLElement* monitor = root->FirstChildElement("monitorData");
monitor;
monitor = monitor->NextSiblingElement("monitorData")) {
ledger_monitor mon;
update_monitor mon;
mon.monitor_id = monitor->FirstChildElement("id") ? monitor->FirstChildElement("id")->GetText() : "N/A";
mon.monitor_name = monitor->FirstChildElement("name") ? monitor->FirstChildElement("name")->GetText() : "N/A";
mon.voltage_level = monitor->FirstChildElement("voltageLevel") ? monitor->FirstChildElement("voltageLevel")->GetText() : "N/A";
mon.terminal_connect = monitor->FirstChildElement("ptType") ? monitor->FirstChildElement("ptType")->GetText() : "N/A";
mon.logical_device_seq = monitor->FirstChildElement("lineNo") ? monitor->FirstChildElement("lineNo")->GetText() : "N/A";
mon.timestamp = monitor->FirstChildElement("timestamp") ? monitor->FirstChildElement("timestamp")->GetText() : "N/A";
//mon.timestamp = monitor->FirstChildElement("timestamp") ? monitor->FirstChildElement("timestamp")->GetText() : "N/A";
mon.terminal_id = monitor->FirstChildElement("terminal_id") ? monitor->FirstChildElement("terminal_name")->GetText() : "N/A";
mon.status = monitor->FirstChildElement("status") ? monitor->FirstChildElement("status")->GetText() : "N/A";
@@ -1540,7 +1548,7 @@ void parse_ledger_update(trigger_update_xml_t& trigger_update_xml,
if (strTag == "add" || strTag == "modify") {
parse_terminal_from_data(trigger_update_xml, strTag, data, guid_value);
} else if (strTag == "delete") {
terminal_dev delete_terminal;
update_dev delete_terminal;
tinyxml2::XMLDocument doc;
if (doc.Parse(data.c_str()) != tinyxml2::XML_SUCCESS) {
@@ -1670,7 +1678,7 @@ int parse_ledger_update_xml(trigger_update_xml_t& trigger_update_xml)
}
//更新单个台账
int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_dev) {
int update_one_terminal_ledger(const update_dev& update,terminal_dev& target_dev) {
// 更新基本信息
if (!update.terminal_id.empty()) {
target_dev.terminal_id = update.terminal_id;
@@ -1680,14 +1688,14 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
target_dev.terminal_name = update.terminal_name;
std::cout << "terminal_name: " << target_dev.terminal_name << std::endl;
}
if (!update.tmnl_factory.empty()) {
/*if (!update.tmnl_factory.empty()) {
target_dev.tmnl_factory = update.tmnl_factory;
std::cout << "tmnl_factory: " << target_dev.tmnl_factory << std::endl;
}
if (!update.tmnl_status.empty()) {
target_dev.tmnl_status = update.tmnl_status;
std::cout << "tmnl_status: " << target_dev.tmnl_status << std::endl;
}
}*/
if (!update.dev_type.empty()) {
target_dev.dev_type = update.dev_type;
std::cout << "dev_type: " << target_dev.dev_type << std::endl;
@@ -1696,20 +1704,20 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
target_dev.processNo = update.processNo;
std::cout << "processNo: " << target_dev.processNo << std::endl;
}
if (!update.dev_series.empty()) {
/*if (!update.dev_series.empty()) {
target_dev.dev_series = update.dev_series;
std::cout << "dev_series: " << target_dev.dev_series << std::endl;
}
if (!update.dev_key.empty()) {
target_dev.dev_key = update.dev_key;
std::cout << "dev_key: " << target_dev.dev_key << std::endl;
}
}*/
if (!update.addr_str.empty()) {
target_dev.addr_str = update.addr_str;
std::cout << "addr_str: " << target_dev.addr_str << std::endl;
}
if (!update.port.empty()) {
/*if (!update.port.empty()) {
target_dev.port = update.port;
std::cout << "port: " << target_dev.port << std::endl;
}
@@ -1739,7 +1747,7 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
std::cerr << "Error: invalid timestamp format." << std::endl;
return -1;
}
}
}*/
// 清空旧监测点并重新填充
target_dev.line.clear();
@@ -1755,7 +1763,7 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
m.terminal_connect = mon.terminal_connect;
m.status = mon.status;
m.terminal_id = mon.terminal_id;
m.timestamp = mon.timestamp;
//m.timestamp = mon.timestamp;
m.CT1 = mon.CT1;
m.CT2 = mon.CT2;
@@ -1767,7 +1775,7 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
std::cout << "monitor_id " << m.monitor_id << " uses delta wiring." << std::endl;
}
if (!m.timestamp.empty()) {
/*if (!m.timestamp.empty()) {
struct tm timeinfo = {};
if (sscanf(m.timestamp.c_str(), "%4d-%2d-%2d %2d:%2d:%2d",
&timeinfo.tm_year, &timeinfo.tm_mon, &timeinfo.tm_mday,
@@ -1781,7 +1789,7 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
std::cout << "monitor time (unix): " << m.timestamp << std::endl;
}
}
}
}*/
target_dev.line.push_back(m);
}
@@ -1796,7 +1804,7 @@ void process_ledger_update(trigger_update_xml_t& ledger_update_xml)
std::cout << "add ledger num: " << ledger_update_xml.new_updates.size() << std::endl;
for (auto it = ledger_update_xml.new_updates.begin(); it != ledger_update_xml.new_updates.end(); ) {
terminal_dev& new_dev = *it;
update_dev& new_dev = *it;
auto found = std::find_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev& d) { return d.terminal_id == new_dev.terminal_id; });
@@ -1812,7 +1820,7 @@ void process_ledger_update(trigger_update_xml_t& ledger_update_xml)
}
if (terminal_devlist.size() >= static_cast<size_t>(IED_COUNT)) {
send_reply_to_queue(new_dev.guid, "2",
send_reply_to_queue(new_dev.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + new_dev.terminal_id + " 台账更新失败,配置台账数量已满");
++it;
continue;
@@ -1820,24 +1828,19 @@ void process_ledger_update(trigger_update_xml_t& ledger_update_xml)
terminal_dev target_dev;
if (update_one_terminal_ledger(new_dev, target_dev) != 0) {
send_reply_to_queue(new_dev.guid, "2",
send_reply_to_queue(new_dev.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + new_dev.terminal_id + " 台账更新失败,无法写入台账");
++it;
continue;
}
if (parse_model_cfg_web_one(target_dev.dev_type).empty()) {
send_reply_to_queue(new_dev.guid, "2",
"终端 id: " + new_dev.terminal_id + " 台账更新失败,未找到装置型号");
++it;
continue;
}
Set_xml_nodeinfo_one(target_dev.dev_type);
init_loggers_bydevid(target_dev.terminal_id);
terminal_devlist.push_back(target_dev);
send_reply_to_queue(new_dev.guid, "2",
DeviceInfo device = make_device_from_terminal(target_dev);
ClientManager::instance().add_device(device);
send_reply_to_queue(new_dev.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + new_dev.terminal_id + " 台账添加成功");
it = ledger_update_xml.new_updates.erase(it);
@@ -1851,25 +1854,22 @@ void process_ledger_update(trigger_update_xml_t& ledger_update_xml)
[&](const terminal_dev& d) { return d.terminal_id == mod_dev.terminal_id; });
if (it != terminal_devlist.end()) {
remove_loggers_by_terminal_id(mod_dev.terminal_id);
erase_one_terminals_by_id(mod_dev.terminal_id);
if (update_one_terminal_ledger(mod_dev, *it) != 0) {
send_reply_to_queue(mod_dev.guid, "2",
send_reply_to_queue(mod_dev.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + mod_dev.terminal_id + " 台账更新失败,写入失败");
continue;
}
if (parse_model_cfg_web_one(it->dev_type).empty()) {
send_reply_to_queue(mod_dev.guid, "2",
"终端 id: " + mod_dev.terminal_id + " 台账更新失败,未找到装置型号");
continue;
}
Set_xml_nodeinfo_one(it->dev_type);
init_loggers_bydevid(mod_dev.terminal_id);
send_reply_to_queue(mod_dev.guid, "2",
DeviceInfo device = make_device_from_terminal(*it);
ClientManager::instance().add_device(device);
send_reply_to_queue(mod_dev.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + mod_dev.terminal_id + " 台账修改成功");
} else {
send_reply_to_queue(mod_dev.guid, "2",
send_reply_to_queue(mod_dev.guid, static_cast<int>(ResponseCode::NOT_FOUND),
"终端 id: " + mod_dev.terminal_id + " 台账修改失败,未找到终端");
}
}
@@ -1882,12 +1882,12 @@ void process_ledger_update(trigger_update_xml_t& ledger_update_xml)
[&](const terminal_dev& d) { return d.terminal_id == del_dev.terminal_id; });
if (it != terminal_devlist.end()) {
remove_loggers_by_terminal_id(del_dev.terminal_id);
terminal_devlist.erase(it);
send_reply_to_queue(del_dev.guid, "2",
erase_one_terminals_by_id(del_dev.terminal_id);
ClientManager::instance().remove_device(del_dev.terminal_id);
send_reply_to_queue(del_dev.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + del_dev.terminal_id + " 台账删除成功");
} else {
send_reply_to_queue(del_dev.guid, "2",
send_reply_to_queue(del_dev.guid, static_cast<int>(ResponseCode::NOT_FOUND),
"终端 id: " + del_dev.terminal_id + " 台账删除失败,未找到终端");
}
}
@@ -2233,14 +2233,14 @@ void execute_bash(string fun,int process_num,string type)
char p_num_str[20];
// 使用 sprintf 转换
std::sprintf(p_num_str, "%d", process_num);
const char* script = std::string(FRONT_PATH + "/bin/set_process.sh").c_str();//使用setsid防止端口占用
std::string script = FRONT_PATH + "/bin/set_process.sh";//使用setsid防止端口占用
const char* param1 = fun.c_str();
const char* param2 = p_num_str;
const char* param3 = type.c_str();
// 构造完整的命令
char command[256];
snprintf(command, sizeof(command), "%s %s %s %s &", script, param1, param2, param3);
snprintf(command, sizeof(command), "%s %s %s %s &", script.c_str(), param1, param2, param3);
std::cout << "command:" << command <<std::endl;
@@ -2250,50 +2250,6 @@ void execute_bash(string fun,int process_num,string type)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////打印台账更新
void print_monitor(const ledger_monitor& mon) {
auto safe = [](const std::string& s) { return s.empty() ? "N/A" : s; };
std::cout << "Monitor ID: " << safe(mon.monitor_id) << "\n";
std::cout << "Terminal ID: " << safe(mon.terminal_id) << "\n";
std::cout << "Monitor Name: " << safe(mon.monitor_name) << "\n";
std::cout << "Logical Device Sequence: " << safe(mon.logical_device_seq) << "\n";
std::cout << "Voltage Level: " << safe(mon.voltage_level) << "\n";
std::cout << "Terminal Connect: " << safe(mon.terminal_connect) << "\n";
std::cout << "Timestamp: " << safe(mon.timestamp) << "\n";
std::cout << "Status: " << safe(mon.status) << "\n";
std::cout << "CT1: " << mon.CT1 << "\n";
std::cout << "CT2: " << mon.CT2 << "\n";
std::cout << "PT1: " << mon.PT1 << "\n";
std::cout << "PT2: " << mon.PT2 << "\n";
}
void print_terminal(const terminal_dev& tmnl) {
auto safe = [](const std::string& s) { return s.empty() ? "N/A" : s; };
std::cout << "GUID: " << safe(tmnl.guid) << "\n";
std::cout << "Terminal ID: " << safe(tmnl.terminal_id) << "\n";
std::cout << "Terminal Code: " << safe(tmnl.terminal_name)<< "\n";
std::cout << "Organization Name: "<< safe(tmnl.org_name) << "\n";
std::cout << "Maintenance Name: " << safe(tmnl.maint_name) << "\n";
std::cout << "Station Name: " << safe(tmnl.station_name) << "\n";
std::cout << "Factory Name: " << safe(tmnl.tmnl_factory) << "\n";
std::cout << "Terminal Status: " << safe(tmnl.tmnl_status) << "\n";
std::cout << "Device Type: " << safe(tmnl.dev_type) << "\n";
std::cout << "Device Key: " << safe(tmnl.dev_key) << "\n";
std::cout << "Device Series: " << safe(tmnl.dev_series) << "\n";
std::cout << "Address: " << safe(tmnl.addr_str) << "\n";
std::cout << "Port: " << safe(tmnl.port) << "\n";
std::cout << "Timestamp: " << safe(tmnl.timestamp) << "\n";
std::cout << "mac: " << safe(tmnl.mac) << "\n";
for (size_t i = 0; i < 10 && !tmnl.line[i].monitor_id.empty(); ++i) {
std::cout << " Monitor " << (i + 1) << ":\n";
print_monitor(tmnl.line[i]);
}
}
void print_trigger_update_xml(const trigger_update_xml_t& trigger_update) {
std::cout << "Work Updates Count: " << trigger_update.work_updates.size() << "\n";
std::cout << "New Updates Count: " << trigger_update.new_updates.size() << "\n";
@@ -3362,7 +3318,7 @@ bool send_file_list(terminal_dev* dev, const std::vector<tag_dir_info>& FileList
// 构造 JSON 报文
nlohmann::json j;
j["guid"] = dev->guid;
j["FrontIP"] = FRONT_IP; // 这里填你的前置机 IP
j["FrontId"] = FRONT_INST; // 这里填你的前置机id
j["Node"] = g_front_seg_index; // 节点号
j["Dev_mac"] = normalize_mac(dev->addr_str); // addr_str 存的是 MAC
@@ -3736,7 +3692,7 @@ bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, con
// 顶层
j["guid"] = dev.guid;
j["FrontIP"] = FRONT_IP; // 你的前置机 IP项目已有常量/变量)
j["FrontId"] = FRONT_INST; // 你的前置机 IP项目已有常量/变量)
j["Node"] = g_front_seg_index; // 节点号(项目已有变量)
j["Dev_mac"] = normalize_mac(dev.addr_str);
@@ -3861,7 +3817,7 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
// 3) 组包顶层
nlohmann::json j;
j["guid"] = dev.guid;
j["FrontIP"] = FRONT_IP;
j["FrontId"] = FRONT_INST;
j["Node"] = g_front_seg_index;
j["Dev_mac"] = normalize_mac(dev.addr_str);
@@ -4274,6 +4230,7 @@ void check_recall_file() {
// 1) 清理首条 DONE/FAILED
bool any_non_empty = false;
bool has_running = false;
for (auto& lm : dev.line) {
while (!lm.recall_list_static.empty()) {
const RecallFile& front = lm.recall_list_static.front();
@@ -4288,43 +4245,48 @@ void check_recall_file() {
<< " " << front.StartTime << " ~ " << front.EndTime << std::endl;
lm.recall_list_static.pop_front();
} else {
break;
break;//找到第一条不是成功或失败的记录就退出循环
}
}
if (!lm.recall_list_static.empty()) {
any_non_empty = true;//弹出后是否为空
if (lm.recall_list_static.front().recall_status == static_cast<int>(RecallStatus::RUNNING)) {
has_running = true; //存在测点正在补招
}
}
if (!lm.recall_list_static.empty()) any_non_empty = true;
}
// 无条目时的装置态收尾
if (!any_non_empty && dev.busytype == static_cast<int>(DeviceState::READING_EVENTLOG)) {
// (保持你原注释)处于“暂态补招”的状态且无条目 -> 清空运行态
if (!any_non_empty && dev.busytype == static_cast<int>(DeviceState::READING_STATSFILE)) {
// 处于“文件补招”的状态且无条目 -> 清空运行态
dev.guid.clear();
dev.busytype = 0;
dev.isbusy = 0;
dev.busytimecount = 0;
continue;
} else if (!any_non_empty && dev.busytype == static_cast<int>(DeviceState::IDLE)) {
} else if (!any_non_empty && dev.busytype != static_cast<int>(DeviceState::IDLE)) {//其他运行态不处理idle往下执行
continue;
}
// 2) 若任一 monitor 的首条为 RUNNING则该终端正在补招中 -> 不下发新的任务(但需要推进状态机!)
bool has_running = false;
/*bool has_running = false;
for (auto& lm : dev.line) {
if (!lm.recall_list_static.empty() &&
lm.recall_list_static.front().recall_status == static_cast<int>(RecallStatus::RUNNING)) {
has_running = true;
has_running = true; //存在测点正在补招
break;
}
}
}*/
// ★新增:当存在 RUNNING 时,推进“该终端的首条补招记录”的两步状态机
if (has_running) {
for (auto& lm : dev.line) {
if (lm.recall_list_static.empty()) continue;
RecallFile& front = lm.recall_list_static.front();
if (front.recall_status != static_cast<int>(RecallStatus::RUNNING)) continue;
if (lm.recall_list_static.empty()) continue;//跳过没有记录的测点
RecallFile& front = lm.recall_list_static.front();//有记录测点的第一个补招
if (front.recall_status != static_cast<int>(RecallStatus::RUNNING)) continue;//跳过不是正在补招的记录
// 初始化阶段:从 NOT_STARTED->RUNNING 已完成,此处保证 phase 就绪
if (front.phase == RecallPhase::IDLE) {
// 初始化阶段:补招分为两个阶段,读文件列表和下载文件,如果是刚进入 RUNNING 状态则初始化
if (front.phase == RecallPhase::IDLE) { //暂态补招
front.phase = RecallPhase::LISTING;
front.cur_dir_index = 0;
front.cur_dir.clear();
@@ -4511,31 +4473,31 @@ void check_recall_file() {
// 3) 没有 RUNNING挑选第一条 NOT_STARTED并发起“首个目录”的请求
for (auto& lm : dev.line) {
if (lm.recall_list_static.empty()) continue;
if (lm.recall_list_static.empty()) continue; //跳过补招列表为空的测点
RecallFile& front = lm.recall_list_static.front();
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) {
RecallFile& front = lm.recall_list_static.front(); //取测点第一条记录
if (front.recall_status == static_cast<int>(RecallStatus::NOT_STARTED)) { //补招未开始
// 标记为 RUNNING并设置终端忙状态
front.recall_status = static_cast<int>(RecallStatus::RUNNING);
dev.isbusy = 1;
dev.busytype = static_cast<int>(DeviceState::READING_STATSFILE);
dev.busytimecount = 0;
front.recall_status = static_cast<int>(RecallStatus::RUNNING); //该补招记录刷新为补招中
dev.isbusy = 1; //装置由idle标记为忙
dev.busytype = static_cast<int>(DeviceState::READING_STATSFILE);//装置状态刷新为正在补招文件
dev.busytimecount = 0; //清空业务超时计数
// 初始化状态机并发出第一个目录请求
front.reset_runtime(true);//保留直下文件信息
front.phase = RecallPhase::LISTING;
if (!front.dir_candidates.empty()) {
front.cur_dir_index = 0;
front.cur_dir = front.dir_candidates[0];
front.list_result = ActionResult::PENDING;
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);
front.phase = RecallPhase::LISTING; //正在请求并等待“目录文件名列表”
if (!front.dir_candidates.empty()) {//目录列表非空
front.cur_dir_index = 0; //正在尝试的目录下标
front.cur_dir = front.dir_candidates[0]; //第一个目录
front.list_result = ActionResult::PENDING; //目录状态:等待返回
ClientManager::instance().add_file_menu_action_to_device(dev.terminal_id, front.cur_dir);//调用目录请求接口
std::cout << "[check_recall_stat] LIST start dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id
<< " dir=" << front.cur_dir
<< " start=" << front.StartTime
<< " end=" << front.EndTime << std::endl;
} else {
front.recall_status = static_cast<int>(RecallStatus::FAILED);
front.recall_status = static_cast<int>(RecallStatus::FAILED); //目录列表空,失败
std::cout << "[check_recall_stat] empty dir_candidates, FAIL dev=" << dev.terminal_id
<< " monitor=" << lm.monitor_id << std::endl;
}
@@ -4735,12 +4697,13 @@ void on_device_response_minimal(int response_code,
break;
}
std::vector<tag_dir_info> names;
// 2) 按该终端当前 busytype 分支处理
const int bt = dev->busytype;
if (bt == static_cast<int>(DeviceState::READING_FILEMENU)) {
// ====== 分支 A当前业务就是“读取文件目录” ======
if (ok) {
std::vector<tag_dir_info> names;
if (filemenu_cache_take(id, names)) {
//发送目录
@@ -4775,41 +4738,42 @@ void on_device_response_minimal(int response_code,
bt == static_cast<int>(DeviceState::READING_EVENTFILE)
|| bt == static_cast<int>(DeviceState::READING_STATSFILE)
) {
// ====== 分支 B当前业务为“下载事件文件/统计文件”(两者处理相同) ======
// 一个装置同一时刻只会有一个监测点在下载
// ====== 分支 B当前业务为“事件文件/统计文件”(两者处理相同) ======
// 一个装置同一时刻只会有一个监测点在补招
ledger_monitor* running_monitor = nullptr;
RecallFile* running_front = nullptr;
// 在该终端下,找到“首条 recall_list_static.front() 正在运行且处于 DOWNLOADING 的监测点”
// 在该终端下,找到“首条 recall_list_static.front() 正在运行且处于 LISTING 的监测点”
for (auto& lm : dev->line) {
if (lm.recall_list_static.empty()) continue;
RecallFile& f = lm.recall_list_static.front();
if (f.recall_status == static_cast<int>(RecallStatus::RUNNING)
&& f.phase == RecallPhase::DOWNLOADING) {
running_monitor = &lm;
running_front = &f;
if (f.recall_status == static_cast<int>(RecallStatus::RUNNING)//补招中
&& f.phase == RecallPhase::LISTING) {//正在请求目录
running_monitor = &lm; //补招的测点
running_front = &f; //补招记录
break;
}
}
if (!running_monitor || !running_front) {
std::cout << "[RESP][FILEDATA->(EVENT/STATS)FILE][WARN] dev=" << id
<< " no RUNNING/DOWNLOADING recall on this device, ignore resp"
if (!running_monitor || !running_front) { //该装置没有正在补招的测点和补招记录,退出处理
std::cout << "[RESP][FILEMENU->(EVENT/STATS)FILE][WARN] dev=" << id
<< " no RUNNING/LISTING recall on this device, ignore resp"
<< " rc=" << response_code << std::endl;
break;
}
// 根据回执结果,回写下载结果;状态机会在下一轮推进到下一个文件/结束
// 根据回执结果,回写目录结果;状态机会在下一轮推进到下一个目录/结束
if (ok) {
running_front->download_result = ActionResult::OK;
std::cout << "[RESP][FILEDATA->(EVENT/STATS)FILE][OK] dev=" << id
running_front->dir_files[running_front->cur_dir] = std::move(names);
running_front->list_result = ActionResult::OK;
std::cout << "[RESP][FILEMENU->(EVENT/STATS)FILE][OK] dev=" << id
<< " monitor=" << running_monitor->monitor_id
<< " file=" << running_front->downloading_file << std::endl;
<< " dir=" << running_front->cur_dir << std::endl;
} else {
running_front->download_result = ActionResult::FAIL;
std::cout << "[RESP][FILEDATA->(EVENT/STATS)FILE][FAIL] dev=" << id
running_front->list_result = ActionResult::FAIL;
std::cout << "[RESP][FILEMENU->(EVENT/STATS)FILE][FAIL] dev=" << id
<< " monitor=" << running_monitor->monitor_id
<< " file=" << running_front->downloading_file
<< " dir=" << running_front->cur_dir
<< " rc=" << response_code << std::endl;
}
break;
@@ -5110,8 +5074,234 @@ bool append_qvvr_event(const std::string& terminal_id,
return true;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////实时数据封装发送
void enqueue_realtime_pq(const RealtagPqDate_float& realdata,
int nPTType,
unsigned char cid,
const std::string& mac,
const std::string& devid)
{
// 先根据 devIdxMap 的配置决定编码分支:
// 2: 基础数据 3: 谐波电压含有率 4: 谐波电流有效值 5: 间谐波电压含有率
int idx = 0;
std::string base64;
// 这里尝试用 mac 作为 key 获取 idx如果项目里 devIdxMap 的 key 不是 mac
// 你可以把这里改成对应的设备 iddevid。未命中则再尝试用规范化后的 mac。
if (devidx_get(devid, idx)) {
switch (idx) {
case 2: // 基础数据(根据接线方式选择转换方法 数据全集解析)
base64 = realdata.ConvertToBase64(nPTType);
break;
case 3: // 谐波电压含有率
base64 = realdata.ConvertToBase64_RtHarmV(nPTType);
break;
case 4: // 谐波电流有效值(幅值)
base64 = realdata.ConvertToBase64_RtHarmI();
break;
case 5: // 间谐波电压含有率
base64 = realdata.ConvertToBase64_RtInHarmV();
break;
default:
// 未知 idx回退到基础数据
base64 = realdata.ConvertToBase64(nPTType);
break;
}
} else {
// 未配置 idx回退到基础数据
base64 = realdata.ConvertToBase64(nPTType);
}
//std::cout << base64 << std::endl;
//lnk实时数据使用接口发送20250711
time_t data_time = ConvertToTimestamp(realdata.time);
{
std::lock_guard<std::mutex> lk(g_last_ts_mtx);
auto it = g_last_ts_by_devid.find(devid);
if (it != g_last_ts_by_devid.end() && it->second == data_time) {
// 同一设备与上次时间相同 → 丢弃本次
return;
}
// 记录本次时间
g_last_ts_by_devid[devid] = data_time;
}
std::vector<DataArrayItem> arr;
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
data_time, //数据转换出来的时间数据时标相对1970年的秒无效填入“-1”
0, //数据时标,微秒钟,无效填入“-1”
0, //数据标识1-标识数据异常
base64});
std::string js = generate_json(
normalize_mac(mac),
-1, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
1, //设备唯一标识Ldid填入0代表Ndid,后续根据商议决定填id还是数字
1, //报文处理的优先级1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
0x1302, //设备数据主动上送的数据类型
cid, //逻辑子设备ID0-逻辑设备本身,无填-1
0x04, //数据类型固定为电能质量数据
1, //数据属性无“0”、实时“1”、统计“2”等
idx, //数据集序号(以数据集方式上送),无填-1
arr //数据数组
);
//std::cout << js << std::en
queue_data_t data;
data.monitor_no = 1; //上送的实时数据没有测点序号统一填1
data.strTopic = TOPIC_RTDATA; //实时topic
data.strText = js;
data.mp_id = ""; //监测点id暂时不需要
data.tag = G_RT_TAG; //实时tag
data.key = G_RT_KEY; //实时key
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
////////////////////////////////////////////////////////////////////////////////统计数据打包发送
// 封装:组装统计数据并入队发送
void enqueue_stat_pq(const std::string& max_base64Str,
const std::string& min_base64Str,
const std::string& avg_base64Str,
const std::string& cp95_base64Str,
time_t data_time,
const std::string& mac,
short cid)
{
std::vector<DataArrayItem> arr;
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”4-“Cp95”
data_time, //数据转换出来的时间数据时标相对1970年的秒无效填入“-1”
0, //数据时标,微秒钟,无效填入“-1”
0, //数据标识1-标识数据异常
max_base64Str});
arr.push_back({2, data_time, 0, 0, min_base64Str});
arr.push_back({3, data_time, 0, 0, avg_base64Str});
arr.push_back({4, data_time, 0, 0, cp95_base64Str});
std::string js = generate_json(
normalize_mac(mac),
-1, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
1, //设备唯一标识Ldid填入0代表Ndid,后续根据商议决定填id还是数字
1, //报文处理的优先级1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
0x1302, //设备数据主动上送的数据类型
cid, //逻辑子设备ID0-逻辑设备本身,无填-1avg_data.name
0x04, //数据类型固定为电能质量
2, //数据属性无“0”、实时“1”、统计“2”等
1, //数据集序号(以数据集方式上送),无填-1
arr //数据数组
);
//std::cout << js << std::endl;
queue_data_t data;
data.monitor_no = cid; //监测点序号avg_data.name
data.strTopic = TOPIC_STAT;//统计topic
data.strText = js;
data.mp_id = ""; //监测点id暂时不需要
data.tag = G_ROCKETMQ_TAG; //统计tag
data.key = G_ROCKETMQ_KEY; //统计key
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
std::cout << "Successfully assembled tagPqData for line: "
<< cid << std::endl;
}
/////////////////////////////////////////////////////////////////////////////////////清空一个装置运行数据
size_t erase_one_terminals_by_id(const std::string& terminal_id) {
// 先对所有匹配项做日志清理
for (const auto& d : terminal_devlist) {
if (d.terminal_id == terminal_id) {
remove_loggers_by_terminal_id(terminal_id);
}
}
// 再统一擦除
const auto old_size = terminal_devlist.size();
terminal_devlist.erase(
std::remove_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev& d){ return d.terminal_id == terminal_id; }),
terminal_devlist.end());
return old_size - terminal_devlist.size();
}
///////////////////////////////////////////////////////////////////////////////////////////////
DeviceInfo make_device_from_terminal(const terminal_dev& t) {
DeviceInfo d;
// 基本信息
d.device_id = t.terminal_id;
d.name = t.terminal_name;
d.model = t.dev_type;
d.mac = t.mac;
// status优先按数字解析其次按 online/true/yes/on 识别;默认 0
int status = 0;
if (!t.tmnl_status.empty()) {
bool parsed_num = false;
// 尝试解析为整数
char* endp = nullptr;
long v = std::strtol(t.tmnl_status.c_str(), &endp, 10);
if (endp && *endp == '\0') { parsed_num = true; status = (v != 0) ? 1 : 0; }
if (!parsed_num) {
std::string s = t.tmnl_status;
for (char& c : s) c = static_cast<char>(std::tolower(static_cast<unsigned char>(c)));
status = (s == "online" || s == "true" || s == "yes" || s == "on" || s == "1") ? 1 : 0;
}
}
d.status = status;
// righttime支持 "1/true/yes/on";默认 false
bool rt = false;
if (!t.Righttime.empty()) {
std::string s = t.Righttime;
for (char& c : s) c = static_cast<char>(std::tolower(static_cast<unsigned char>(c)));
rt = (s == "1" || s == "true" || s == "yes" || s == "on");
}
d.righttime = rt;
// points
d.points.clear();
d.points.reserve(t.line.size());
for (const auto& m : t.line) {
PointInfo p;
p.point_id = m.monitor_id;
p.name = m.monitor_name;
p.device_id = t.terminal_id;
// nCpuNo从 logical_device_seq 转 ushort非法则置 0
unsigned short cpu_no = 0;
if (!m.logical_device_seq.empty()) {
char* endp = nullptr;
long v = std::strtol(m.logical_device_seq.c_str(), &endp, 10);
if (endp && *endp == '\0' && v >= 0) {
if (v > 0xFFFF) v = 0xFFFF;
cpu_no = static_cast<unsigned short>(v);
}
}
p.nCpuNo = cpu_no;
// 变比与电压等级
p.PT1 = m.PT1;
p.PT2 = m.PT2;
p.CT1 = m.CT1;
p.CT2 = m.CT2;
p.strScale = m.voltage_level;
// 接线方式0-星型 1-角型;支持 "0/1"、包含“角/三角/delta/Δ”
int pttype = 0; // 默认星型
if (!m.terminal_connect.empty()) {
std::string s = m.terminal_connect;
for (char& c : s) c = static_cast<char>(std::tolower(static_cast<unsigned char>(c)));
if (s == "1" || s.find("") != std::string::npos || s.find("三角") != std::string::npos
|| s.find("delta") != std::string::npos || s.find("") != std::string::npos || s.find("Δ") != std::string::npos) {
pttype = 1;
}
}
p.nPTType = pttype;
d.points.push_back(std::move(p));
}
return d;
}

View File

@@ -424,8 +424,8 @@ void download_xml_for_icd(const std::string& MODEL_ID,
//读最新本地台账
std::string read_latest_ledger_file() {
const char* dir = std::string(FRONT_PATH + "/dat/ledger").c_str();
DIR* dp = opendir(dir);
std::string dir = FRONT_PATH + "/dat/ledger";
DIR* dp = opendir(dir.c_str());
if (!dp) return "";
struct dirent* entry;
@@ -635,8 +635,9 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
//dev.dev_series = safe_str(item, "series");
//dev.port = safe_str(item, "port");
//dev.timestamp = safe_str(item, "updateTime");
dev.Righttime = safe_str(item, "Righttime");
dev.processNo = safe_str(item, "node");
//dev.maxProcessNum = safe_str(item, "maxProcessNum");
dev.maxProcessNum = safe_str(item, "maxProcessNum");
//dev.mac = safe_str(item, "mac");//添加mac
@@ -650,7 +651,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
m.logical_device_seq = safe_str(mon, "lineNo");
m.voltage_level = safe_str(mon, "voltageLevel");
m.terminal_connect = safe_str(mon, "ptType");
m.timestamp = safe_str(mon, "updateTime");
//m.timestamp = safe_str(mon, "updateTime");
m.status = safe_str(mon, "status");
m.CT1 = mon.value("ct1", 0.0);
@@ -703,7 +704,7 @@ int parse_device_cfg_web()
// 1. 构造入参 JSON
std::string input_jstr = "{";
input_jstr += "\"ip\":\"" + FRONT_IP + "\",";
input_jstr += "\"id\":\"" + FRONT_INST + "\",";
input_jstr += "\"runFlag\":" + TERMINAL_STATUS;
input_jstr += "}";
@@ -751,7 +752,7 @@ int parse_device_cfg_web()
DIY_DEBUGLOG("process", "【DEBUG】前置的%d号进程调用获取到的台账的数量为:%d", g_front_seg_index, count_cfg);
if (IED_COUNT < count_cfg) {
std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl;
std::cout << "!!!!!!!!!!single process has ledger count more than config!!!!!!!" << std::endl;
//DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT);
} else {
//DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT);
@@ -763,17 +764,22 @@ int parse_device_cfg_web()
terminal_devlist.clear();
int idx = 0;
for (const auto& kv : terminal_dev_map) {
terminal_dev dev = kv.second; // kv.second 是对象,不用判断指针
//dev.dev_index = static_cast<int>(idx++);
// ======= [新增] 对 terminal_dev 中 web 未返回/未设置字段做统一初始化,避免脏值 =======
dev.guid.clear(); // [新增] 业务 guid 初始为空
dev.busytype = 0; // [新增] 业务类型(状态机)默认 0
dev.isbusy = 0; // [新增] 未进行业务
dev.busytimecount = 0; // [新增] 业务计时清零
dev.internal_values.clear(); // [新增] 内部定值清空,等后续业务真实填充
dev.dz_internal_info_list.clear(); // [新增] 内部定值信息清空,等后续业务真实填充
dev.control_words.clear();
// ------------------------------------------------------------------------------------
// ======= [新增] 对每个监测点做必要的内部结构初始化 =======
@@ -785,6 +791,10 @@ int parse_device_cfg_web()
// 定值列表清理,等待后续配置/采集填充
mon.set_values.clear(); // [新增]
mon.dz_info_list.clear(); // [新增]
//补招列表清理
mon.recall_list.clear();
mon.recall_list_static.clear();
}
// ------------------------------------------------------------------------------------

View File

@@ -7,6 +7,8 @@
#include <array>
#include <map>
#include <mutex>
#include <sstream>
#include <iostream>
///////////////////////////////////////////////////////////////////////////////////////////
@@ -178,6 +180,55 @@ class qvvr_event
std::vector<qvvr_file> qvvrfile; //暂态文件组列表
};
////////////////////////////////////////////////////////////////台账更新用
class update_monitor
{
public:
std::string monitor_id; //监测点id
std::string terminal_id; //监测点的终端id
std::string monitor_name; //监测点名
std::string logical_device_seq; //监测点序号
std::string voltage_level; //监测点电压等级
std::string terminal_connect; //监测点接线方式
std::string timestamp; //更新时间
std::string status; //监测点状态
double PT1; // 电压变比1
double PT2; // 电压变比2
double CT1; // 电流变比1
double CT2; // 电流变比2
};
//终端台账
class update_dev
{
public:
std::string guid; // ★新增:供发送回复使用
std::string terminal_id;
std::string terminal_name;
std::string org_name;
std::string maint_name;
std::string station_name;
std::string tmnl_factory;
std::string tmnl_status;
std::string dev_type;
std::string dev_key;
std::string dev_series;
std::string addr_str; //装置ip
std::string port; //装置端口
std::string timestamp; //更新时间
std::string Righttime; //对时
std::string processNo;
std::string maxProcessNum;
std::string mac; // 装置MAC地址接口中从addr_str获取因为ip和mac放同一位置
std::vector<update_monitor> line;
};
////////////////////////////////////////////////////////////////////台账更新用
//监测点台账
class ledger_monitor
{
@@ -212,6 +263,8 @@ public:
class terminal_dev
{
public:
//int dev_index;
std::string guid; //正在进行的guid
int busytype; //业务类型,使用状态机
int isbusy; //业务进行标志
@@ -234,10 +287,10 @@ public:
std::string dev_series;
std::string addr_str; //装置ip
std::string port; //装置端口
std::string timestamp;
std::string timestamp; //更新时间
std::string Righttime; //对时
std::string processNo;
std::string maxProcessNum;
std::string mac; // 装置MAC地址接口中从addr_str获取因为ip和mac放同一位置
std::vector<ledger_monitor> line;
@@ -269,6 +322,13 @@ public:
std::string key; // 消息key
};
//台账更新回复
struct DeviceReply {
std::string deviceId;
int code;
std::string result;
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// SOE 事件类
@@ -398,10 +458,10 @@ public:
#define MAX_UPDATEA_NUM 10
typedef struct trigger_update_xml_t trigger_update_xml_t;
struct trigger_update_xml_t {
std::vector<terminal_dev> new_updates;
std::vector<terminal_dev> modify_updates;
std::vector<terminal_dev> delete_updates;
std::vector<terminal_dev> work_updates;
std::vector<update_dev> new_updates;
std::vector<update_dev> modify_updates;
std::vector<update_dev> delete_updates;
std::vector<update_dev> work_updates;
trigger_update_xml_t() = default;
};
@@ -478,7 +538,6 @@ std::string get_current_time();
bool is_blank(const std::string& str);
void print_terminal(const terminal_dev& tmnl);
void printTerminalDevMap(const std::map<std::string, terminal_dev>& terminal_dev_map);
void upload_data_test();
@@ -624,6 +683,8 @@ void check_device_busy_timeout();
//业务上报
void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type, const std::string& guid, const std::string& mac);
void send_batch_reply_to_queue(const std::string& guid,
const std::vector<DeviceReply>& replies);
//内部定值响应
bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_kzz_bit> &control_words);
@@ -671,8 +732,78 @@ void filemenu_cache_put(const std::string& dev_id,
//提取目录信息
bool filemenu_cache_take(const std::string& dev_id, std::vector<tag_dir_info>& out);
//清空装置台账
size_t erase_one_terminals_by_id(const std::string& terminal_id);
//小工具
//转换结构
DeviceInfo make_device_from_terminal(const terminal_dev& t);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////模板打印
// ===== 通用打印实现(模板) =====
template <typename Mon>
inline void print_monitor_common(const Mon& mon) {
auto safe = [](const std::string& s) -> const std::string& {
static const std::string NA = "N/A";
return s.empty() ? NA : s;
};
std::cout << "Monitor ID: " << safe(mon.monitor_id) << "\n";
std::cout << "Terminal ID: " << safe(mon.terminal_id) << "\n";
std::cout << "Monitor Name: " << safe(mon.monitor_name) << "\n";
std::cout << "Logical Device Sequence: " << safe(mon.logical_device_seq) << "\n";
std::cout << "Voltage Level: " << safe(mon.voltage_level) << "\n";
std::cout << "Terminal Connect: " << safe(mon.terminal_connect) << "\n";
std::cout << "Timestamp: " << safe(mon.timestamp) << "\n";
std::cout << "Status: " << safe(mon.status) << "\n";
std::cout << "CT1: " << mon.CT1 << "\n";
std::cout << "CT2: " << mon.CT2 << "\n";
std::cout << "PT1: " << mon.PT1 << "\n";
std::cout << "PT2: " << mon.PT2 << "\n";
}
template <typename Dev>
inline void print_terminal_common(const Dev& tmnl) {
auto safe = [](const std::string& s) -> const std::string& {
static const std::string NA = "N/A";
return s.empty() ? NA : s;
};
std::cout << "GUID: " << safe(tmnl.guid) << "\n";
std::cout << "Terminal ID: " << safe(tmnl.terminal_id) << "\n";
std::cout << "Terminal Code: " << safe(tmnl.terminal_name) << "\n";
std::cout << "Organization Name: "<< safe(tmnl.org_name) << "\n";
std::cout << "Maintenance Name: " << safe(tmnl.maint_name) << "\n";
std::cout << "Station Name: " << safe(tmnl.station_name) << "\n";
std::cout << "Factory Name: " << safe(tmnl.tmnl_factory) << "\n";
std::cout << "Terminal Status: " << safe(tmnl.tmnl_status) << "\n";
std::cout << "Device Type: " << safe(tmnl.dev_type) << "\n";
std::cout << "Device Key: " << safe(tmnl.dev_key) << "\n";
std::cout << "Device Series: " << safe(tmnl.dev_series) << "\n";
std::cout << "Address: " << safe(tmnl.addr_str) << "\n";
std::cout << "Port: " << safe(tmnl.port) << "\n";
std::cout << "Timestamp: " << safe(tmnl.timestamp) << "\n";
std::cout << "Righttime: " << safe(tmnl.Righttime) << "\n";
std::cout << "mac: " << safe(tmnl.mac) << "\n";
// 安全遍历最多前 10 个测点(不越界),且跳过空 monitor_id 的项
const size_t n = std::min<size_t>(tmnl.line.size(), 10);
for (size_t i = 0; i < n; ++i) {
if (tmnl.line[i].monitor_id.empty()) continue;
std::cout << " Monitor " << (i + 1) << ":\n";
print_monitor_common(tmnl.line[i]); // 模板会自动匹配 update/ledger 的 monitor
}
}
// ===== 薄包装重载(保持原有函数名不变)=====
inline void print_monitor(const update_monitor& mon) { print_monitor_common(mon); }
inline void print_monitor(const ledger_monitor& mon) { print_monitor_common(mon); }
inline void print_terminal(const update_dev& tmnl) { print_terminal_common(tmnl); }
inline void print_terminal(const terminal_dev& tmnl) { print_terminal_common(tmnl); }
///////////////////////////////////////////////////////////////////////////////////////////////////////////////小工具
inline std::string trim_cstr(const char* s, size_t n) {
if (!s) return {};
size_t end = 0;
@@ -695,14 +826,62 @@ inline std::string sanitize(std::string s) {
}
return s;
}
// 当前本地时间,格式 "YYYY-MM-DD HH:MM:SS"
inline std::string now_yyyy_mm_dd_hh_mm_ss() {
std::time_t t = std::time(nullptr);
std::tm tmv;
#if defined(_WIN32)
localtime_s(&tmv, &t); // Windows 线程安全
#else
localtime_r(&t, &tmv); // POSIX 线程安全
#endif
std::ostringstream oss;
oss << std::put_time(&tmv, "%Y-%m-%d %H:%M:%S");
return oss.str();
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////实时数据用
// === 专用锁 + 数据表(仅管实时 idx 映射) ===
extern std::mutex devidx_lock; // 新锁(不要用 ledgermtx
extern std::unordered_map<std::string,int> devIdxMap; // id -> idx一对一
// === 常用操作:全部用这把锁保护 ===
inline void devidx_set(const std::string& id, int idx) {
std::lock_guard<std::mutex> lk(devidx_lock);
devIdxMap[id] = idx; // 覆盖更新
}
inline bool devidx_get(const std::string& id, int& out_idx) {
std::lock_guard<std::mutex> lk(devidx_lock);
auto it = devIdxMap.find(id);
if (it == devIdxMap.end()) return false;
out_idx = it->second;
return true;
}
void enqueue_realtime_pq(const RealtagPqDate_float& realdata,
int nPTType,
unsigned char cid,
const std::string& mac,
const std::string& devid);
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////统计数据
void enqueue_stat_pq(const std::string& max_base64Str,
const std::string& min_base64Str,
const std::string& avg_base64Str,
const std::string& cp95_base64Str,
time_t data_time,
const std::string& mac,
short cid);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
extern int g_front_seg_index;
extern std::string FRONT_IP;
extern std::string FRONT_INST;
extern std::string FRONT_PATH;
extern std::string WEB_FILEUPLOAD;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 响应码枚举
// 云平台响应码枚举
enum class ResponseCode : int {
OK = 200, // 请求成功
ACCEPTED = 201, // 请求被接受,开始处理

View File

@@ -150,6 +150,7 @@ protected:
<< "\",\"nodeId\":\"" << FRONT_INST
<< "\",\"businessId\":\"" << extract_logger_id(logger_name)
<< "\",\"level\":\"" << level_str
<< "\",\"time\":\"" << now_yyyy_mm_dd_hh_mm_ss()
<< "\",\"grade\":\"" << get_level_str(level)
// ★新增:输出 code 字段(整型)
<< "\",\"code\":\"" << code
@@ -323,7 +324,7 @@ void init_loggers_bydevid(const std::string& dev_id)
Logger device_logger = init_logger(device_key, device_dir, dev_id, device_appender);
logger_map[device_key] = TypedLogger(device_logger, LOGTYPE_DATA);
DIY_WARNLOG(dev_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str());
//DIY_WARNLOG(dev_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str());
}
// 初始化监测点日志monitor.<mp_id>.COM / .DATA
@@ -348,7 +349,7 @@ void init_loggers_bydevid(const std::string& dev_id)
Logger mon_logger = init_logger(mon_key.str(), mon_path.str(), mon_name.str(), monitor_appender);
logger_map[mon_key.str()] = TypedLogger(mon_logger, LOGTYPE_DATA);
DIY_WARNLOG(monitor.monitor_id.c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str());
//DIY_WARNLOG(monitor.monitor_id.c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str());
}
}
}
@@ -388,7 +389,7 @@ void init_loggers()
logger_map[device_key] = TypedLogger(device_logger, LOGTYPE_DATA);
DIY_WARNLOG(term.terminal_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str());
//DIY_WARNLOG(term.terminal_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str());
// 初始化监测点日志
for (size_t i = 0; i < term.line.size(); ++i) {
@@ -411,8 +412,8 @@ void init_loggers()
logger_map[mon_key.str()] = TypedLogger(mon_logger, LOGTYPE_DATA);
DIY_WARNLOG(mon_key.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕",
monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str());
//DIY_WARNLOG(mon_key.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕",
//monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str());
}
}
}
@@ -480,22 +481,28 @@ extern "C" {
void log_warn (const char* key, const char* msg) { log4_log_with_level(key, msg, 2); }
void log_error(const char* key, const char* msg) { log4_log_with_level(key, msg, 3); }
void send_reply_to_queue_c(const char* guid, const char* step, const char* result) {
send_reply_to_queue(std::string(guid), std::string(step), std::string(result));
}
//标准化日志接口
// #define LOGMSG_WITH_TS // 需要时间时再打开
void format_log_msg(char* buf, size_t buf_size, const char* fmt, ...) {
if (!buf || buf_size == 0) return;
buf[0] = '\0';
if (!fmt) return;
va_list args;
va_start(args, fmt);
#ifdef LOGMSG_WITH_TS
// 写入时间
time_t now = time(NULL);
struct tm tm_info;
localtime_r(&now, &tm_info);
strftime(buf, buf_size, "%Y-%m-%d %H:%M:%S ", &tm_info); // 时间+空格
// 处理可变参数并写入剩余内容
va_list args;
va_start(args, fmt);
vsnprintf(buf + strlen(buf), buf_size - strlen(buf), fmt, args);
size_t n = strftime(buf, buf_size, "%Y-%m-%d %H:%M:%S ", &tm_info);
if (n < buf_size) {
vsnprintf(buf + n, buf_size - n, fmt, args);
}
#else
vsnprintf(buf, buf_size, fmt, args);
#endif
va_end(args);
}

View File

@@ -61,7 +61,7 @@ struct DebugSwitch {
extern std::map<std::string, TypedLogger> logger_map;
extern DebugSwitch g_debug_switch;
extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result);
extern void send_reply_to_queue(const std::string& guid, const int code, const std::string& result);
//std::string get_front_type_from_subdir();
@@ -94,8 +94,6 @@ void log_debug(const char* key, const char* msg);
void log_info(const char* key, const char* msg);
void log_warn(const char* key, const char* msg);
void log_error(const char* key, const char* msg);
void send_reply_to_queue_c(const char* guid, const char* step, const char* result);
void format_log_msg(char* buf, size_t buf_size, const char* fmt, ...);
// ====================== ★新增:线程局部变量透传 code ======================
@@ -122,17 +120,10 @@ typedef enum LogCode {
LOG_CODE_REPORT = 500, /* 报告处理 */
LOG_CODE_COMM = 600, /* 通讯状态 */
LOG_CODE_SPACE_ALARM = 700, /* 空间告警 */
LOG_CODE_DEV_ALARM = 800 /* 装置告警 */
LOG_CODE_DEV_ALARM = 800 /* 设备告警 */
} LogCode;
// ====================== 日志宏区域 ======================
// 原始不带 code 的实现(兼容/复用)
#define DIY_LOG(LEVEL_FUNC, KEY, ...) \
do { \
char buf[256]; \
format_log_msg(buf, sizeof(buf), __VA_ARGS__); \
LEVEL_FUNC(KEY, buf); \
} while (0)
// ★新增:带 code 的实现C/C++ 通用,使用 TLS 保存/恢复)
#define DIY_LOG_CODE(LEVEL_FUNC, KEY, LEVEL_INT, CODE_INT, ...) \

View File

@@ -320,7 +320,7 @@ void Front::FrontThread() {
check_recall_file(); //处理补招文件-稳态和暂态
check_recall_event(); // 处理补招事件从list中读取然后直接调用接口,每一条可能都不同测点,每个测点自己做好记录
check_ledger_update(); // 触发台账更新
//check_ledger_update(); // 触发台账更新
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

View File

@@ -58,6 +58,9 @@ std::list<queue_data_t> queue_data_list;
static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者
std::mutex devidx_lock;
std::unordered_map<std::string, int> devIdxMap;//实时数据用的idx
///////////////////////////////////////////////////////////////////////////////////////////////////////////
//前置进程
@@ -343,7 +346,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit){
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& idx){
json root;
try {
root = json::parse(body);
@@ -378,7 +381,8 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l
!messageBody.contains("line") ||
!messageBody.contains("realData") ||
!messageBody.contains("soeData") ||
!messageBody.contains("limit"))
!messageBody.contains("limit")||
!messageBody.contains("idx"))
{
std::cerr << "Missing expected fields in 'messageBody'." << std::endl;
return false;
@@ -390,6 +394,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l
realData = messageBody["realData"].get<bool>();
soeData = messageBody["soeData"].get<bool>();
limit = messageBody["limit"].get<int>();
idx = messageBody["idx"].get<int>();
} catch (const std::exception& e) {
std::cerr << "Type error while extracting fields: " << e.what() << std::endl;
return false;
@@ -403,7 +408,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l
// 回复执行结果直接看实时数据不需要再回复1是收到消息
if (!guid.empty()) {
send_reply_to_queue(guid, "1", "收到三秒数据指令");
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到三秒数据指令");
}
return true;
@@ -495,7 +500,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str());
send_reply_to_queue(guid, "1", "收到重置进程指令,重启所有进程!");
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!");
std::cout << "this msg should only execute once" << std::endl;
} else {
std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl;
@@ -503,7 +508,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
}
else if (fun == "delete") {
send_reply_to_queue(guid, "1", "收到删除进程指令,这个进程将会重启 ");
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到删除进程指令,这个进程将会重启 ");
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str());
@@ -597,7 +602,7 @@ bool parseJsonMessageLOG(const std::string& json_str) {
/*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/
// 回复消息
send_reply_to_queue(guid, "1", "收到实时日志指令");
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到实时日志指令");
if (code_str == "set_log") {
// 校验数据合法性
@@ -668,32 +673,38 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
send_reply_to_queue(guid, "1", "收到台账更新指令");
//send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到台账更新指令");
std::vector<DeviceReply> reply_list;
if (code_str == "add_terminal" || code_str == "ledger_modify") {
std::cout << "add or update ledger" << std::endl;
std::cout << "add or modify ledger" << std::endl;
if (messageBody.contains("data") && messageBody["data"].is_array()) {
for (const auto& item : messageBody["data"]) {
terminal_dev json_data;
json_data.terminal_id = item.value("id", "");
json_data.terminal_name = item.value("name", "");
json_data.org_name = item.value("org_name", "");
json_data.maint_name = item.value("maint_name", "");
json_data.station_name = item.value("stationName", "");
json_data.tmnl_factory = item.value("manufacturer", "");
json_data.tmnl_status = item.value("status", "");
//json_data.org_name = item.value("org_name", "");
//json_data.maint_name = item.value("maint_name", "");
//json_data.station_name = item.value("stationName", "");
//json_data.tmnl_factory = item.value("manufacturer", "");
//json_data.tmnl_status = item.value("status", "");
json_data.dev_type = item.value("devType", "");
json_data.dev_key = item.value("devKey", "");
json_data.dev_series = item.value("series", "");
//json_data.dev_key = item.value("devKey", "");
//json_data.dev_series = item.value("series", "");
int procNo = item.value("processNo", -1);
json_data.processNo = std::to_string(procNo);
json_data.addr_str = item.value("ip", "");
json_data.port = item.value("port", "");
json_data.timestamp = item.value("updateTime", "");
//int procNum = item.value("maxProcessNum", -1);
//json_data.maxProcessNum = std::to_string(procNum);
//json_data.addr_str = item.value("ip", "");
//json_data.port = item.value("port", "");
//json_data.timestamp = item.value("updateTime", "");
json_data.Righttime = item.value("Righttime", "");
if (item.contains("monitorData") && item["monitorData"].is_array()) {
int j = 0;
@@ -706,43 +717,178 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
m.status = monitor_item.value("status", "");
m.logical_device_seq = monitor_item.value("lineNo", "");
m.terminal_connect = monitor_item.value("ptType", "");
m.timestamp = json_data.timestamp;
m.terminal_id = json_data.terminal_id;
//m.timestamp = json_data.timestamp;
m.terminal_id = monitor_item.value("deviceId", json_data.terminal_id);
m.CT1 = monitor_item.value("CT1", 0.0);
m.CT2 = monitor_item.value("CT2", 0.0);
m.PT1 = monitor_item.value("PT1", 0.0);
m.PT2 = monitor_item.value("PT2", 0.0);
}
}
print_terminal(json_data);
std::string xmlContent = prepare_update(code_str, json_data, guid);
/*std::string xmlContent = prepare_update(code_str, json_data, guid);
if (!xmlContent.empty()) {
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id);
std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_" + code_str + ".xml";
writeToFile(file_name, xmlContent);
}
}*/
if(code_str == "add_terminal"){
DeviceReply one;
one.deviceId = json_data.terminal_id;
std::lock_guard<std::mutex> lock(ledgermtx);
// ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中
const std::string& tid = json_data.terminal_id;
auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev& d){ return d.terminal_id == tid; });
if (it == terminal_devlist.end()) {
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账添加成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 已存在该装置,修改这个装置的台账");*/
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账修改成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "台账修改失败";
}
}
reply_list.push_back(std::move(one));
}
else if(code_str == "ledger_modify"){
DeviceReply one;
one.deviceId = json_data.terminal_id;
std::lock_guard<std::mutex> lock(ledgermtx);
// ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中
const std::string& tid = json_data.terminal_id;
auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev& d){ return d.terminal_id == tid; });
if (it == terminal_devlist.end()) {
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::NOT_FOUND),
"终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置");*/
DIY_WARNLOG("process", "【WARN】无法修改台账未找到指定装置: %s ,改为添加这个装置", tid.c_str());
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账添加成功";
reply_list.push_back(std::move(one));
continue;//修改下一个
}
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账修改成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "台账修改失败";
}
reply_list.push_back(std::move(one));
}
}
}
} else if (code_str == "delete_terminal") {
std::cout << "delete ledger" << std::endl;
if (messageBody.contains("data") && messageBody["data"].is_array()) {
for (const auto& item : messageBody["data"]) {
terminal_dev json_data{};
auto id = item.value("id", "");
json_data.terminal_id = id;
std::string xmlContent = prepare_update(code_str, json_data, guid);
/*std::string xmlContent = prepare_update(code_str, json_data, guid);
if (!xmlContent.empty()) {
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id);
std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_delete_terminal.xml";
writeToFile(file_name, xmlContent);
}
}*/
DeviceReply one;
one.deviceId = json_data.terminal_id;
//直接加锁删除
std::lock_guard<std::mutex> lock(ledgermtx);
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
ClientManager::instance().remove_device(json_data.terminal_id);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账删除成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账删除成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账删除失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "台账删除失败";
}
reply_list.push_back(std::move(one));
}
}
} else {
std::cout << "code_str error" << std::endl;
}
send_batch_reply_to_queue(guid, reply_list);
return true;
}
@@ -777,8 +923,9 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
ushort line;
bool realData = false, soeData = false;
int limit = 0;
int idx = 0;
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit,idx)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
return rocketmq::RECONSUME_LATER;
@@ -794,10 +941,13 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
if (ClientManager::instance().get_dev_status(devid) != 1) {
std::cout << "devid对应装置不在线: " << devid << std::endl;
// 记录日志不响应 web 端
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str());
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的实时数据触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str());
return rocketmq::CONSUME_SUCCESS;
}
//记录idx
devidx_set(devid, idx);//每次下发都会更新,不加入运行用的结构体
ClientManager::instance().set_real_state_count(devid, 60, line);//一秒询问一次询问60次,下一次同一个测点调用的话就会刷新
}
else{
@@ -1071,48 +1221,51 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ip>" << json_data.addr_str << "</ip>" << std::endl; // Assuming `addr_str` for IP
//add_indent(xmlStream, indentLevel);
//xmlStream << "<ip>" << json_data.addr_str << "</ip>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<devType>" << json_data.dev_type << "</devType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<maintName>" << json_data.maint_name << "</maintName>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<maintName>" << json_data.maint_name << "</maintName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<orgName>" << json_data.org_name << "</orgName>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<orgName>" << json_data.org_name << "</orgName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<port>" << json_data.port << "</port>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<port>" << json_data.port << "</port>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalCode>" << json_data.terminal_name << "</terminalCode>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
//add_indent(xmlStream, indentLevel);
//xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
add_indent(xmlStream, indentLevel);
xmlStream << "<manufacturer>" << json_data.tmnl_factory << "</manufacturer>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<manufacturer>" << json_data.tmnl_factory << "</manufacturer>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << json_data.tmnl_status << "</status>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<status>" << json_data.tmnl_status << "</status>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<series>" << json_data.dev_series << "</series>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<series>" << json_data.dev_series << "</series>" << std::endl;
//lnk20250210
add_indent(xmlStream, indentLevel);
xmlStream << "<processNo>" << json_data.processNo << "</processNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
xmlStream << "<Righttime>" << json_data.Righttime << "</Righttime>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<mac>" << json_data.mac << "</mac>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<mac>" << json_data.mac << "</mac>" << std::endl;
// monitorData 部分
for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) {
@@ -1137,8 +1290,8 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json
add_indent(xmlStream, indentLevel);
xmlStream << "<ptType>" << monitor.terminal_connect << "</ptType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminal_id>" << monitor.terminal_id << "</terminal_id>" << std::endl;
@@ -1245,15 +1398,14 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime,
////////////////////////////////////////////////////////////////////////////////////////////////////////////////响应消息
void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result) {
void send_reply_to_queue(const std::string& guid, const int code, const std::string& result) {
try {
// 构造 JSON 对象
nlohmann::json obj;
obj["guid"] = guid;
obj["step"] = step;
obj["code"] = code;
obj["result"] = result;
obj["processNo"] = g_front_seg_index;
obj["frontType"] = "cloudfront";
obj["nodeId"] = FRONT_INST;
// 构造 queue 消息
@@ -1272,6 +1424,37 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const
}
}
//台账更新批量回复
void send_batch_reply_to_queue(const std::string& guid,
const std::vector<DeviceReply>& replies) {
try {
nlohmann::json root;
root["guid"] = guid;
nlohmann::json arr = nlohmann::json::array();
for (const auto& r : replies) {
nlohmann::json item;
item["deviceId"] = r.deviceId;
item["code"] = r.code;
item["result"] = r.result;
arr.push_back(std::move(item));
}
root["data"] = std::move(arr);
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = root.dump();
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
} catch (const std::exception& e) {
std::cerr << "send_batch_reply_to_queue exception: " << e.what() << std::endl;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////心跳消息
void send_heartbeat_to_queue(const std::string& status) {
@@ -1849,7 +2032,7 @@ void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type, co
// ---- 构造根 JSON ----
nlohmann::json obj;
obj["guid"] = guid;
obj["FrontIP"] = FRONT_IP;
obj["FrontId"] = FRONT_INST;
obj["Node"] = g_front_seg_index;
// Dev_mac从台账取 addr_str 并规范化
@@ -1918,11 +2101,11 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg)
// 消息解析
std::string guid;
std::string devid;
std::string FrontIP;
std::string FrontId;
int Node;
nlohmann::json DetailObj;
if (!parseJsonMessageCLOUD(body, devid, guid, DetailObj,FrontIP,Node)) {
if (!parseJsonMessageCLOUD(body, devid, guid, DetailObj,FrontId,Node)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的云前置控制消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
return rocketmq::RECONSUME_LATER;
@@ -1932,11 +2115,11 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg)
std::cout << "[CLOUD Msg Parsed] "
<< "guid=" << guid
<< ", devid=" << devid
<< ", FrontIP=" << FrontIP
<< ", FrontId=" << FrontId
<< ", Node=" << Node
<< std::endl;
if(FrontIP != FRONT_IP || Node != g_front_seg_index){
if(FrontId != FRONT_INST || Node != g_front_seg_index){
std::cout << "当前进程不消费这个消息" << std::endl;
return rocketmq::CONSUME_SUCCESS;
}
@@ -2006,4 +2189,7 @@ void connect_status_update(const std::string& id, int status)
// 调试打印
std::cout << "[connect_status_update] queued JSON:\n" << j.dump(4) << std::endl;
}
}

View File

@@ -414,6 +414,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
std::ostringstream os;
os << "\r\x1B[K------------------------------------\n";
//os << "\r\x1B[K|-- dev_index : " << dev.dev_index << "\n";
os << "\r\x1B[K|-- terminal_id : " << dev.terminal_id << "\n";
os << "\r\x1B[K|-- terminal_name : " << dev.terminal_name << "\n";
os << "\r\x1B[K|-- dev_ip : " << dev.addr_str << "\n";
@@ -429,6 +430,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
os << "\r\x1B[K|-- tmnl_factory : " << dev.tmnl_factory << "\n";
os << "\r\x1B[K|-- tmnl_status : " << dev.tmnl_status << "\n";
os << "\r\x1B[K|-- timestamp : " << dev.timestamp << "\n";
os << "\r\x1B[K|-- Righttime : " << dev.Righttime << "\n";
os << "\r\x1B[K|-- mac : " << dev.mac << "\n";
// ========================= 终端级 · 内部定值 =========================

View File

@@ -13,6 +13,7 @@
#include "cloudfront/code/interface.h" //lnk20250708
#include "cloudfront/code/rocketmq.h" //lnk20250708
#include "cloudfront/code/log4.h" //lnk20250924
#include "client2.h"
#include "cloudfront/code/log4.h"
@@ -122,17 +123,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
//end_time.tm_min = 1;
//end_time.tm_sec = 1;
//ClientManager::instance().read_eventlog_action_to_device(id, start_time, end_time,2,1);
//DIY_ERRORLOG_CODE("111", 0, static_cast<int>(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 前置");
//DIY_ERRORLOG_CODE(id, 1, static_cast<int>(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 设备");
/*std::string mpid;
DIY_ERRORLOG_CODE("111", 0, static_cast<int>(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 前置");
DIY_ERRORLOG_CODE(id, 1, static_cast<int>(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 设备");
std::string mpid;
get_monitor_id_by_dev_and_seq(id, 1, mpid);
if (!mpid.empty()) {
DIY_ERRORLOG_CODE(mpid, 2, static_cast<int>(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 测点");
}*/
}
}
if (udata[19] == 0x00) {
std::cout << "cloud login: " << mac << " state: fail!" << std::endl;
@@ -233,7 +230,6 @@ void process_received_message(string mac, string id,const char* data, size_t len
}
}
// 获取测点参数
std::string strScale;//电压等级
int nPTType;//接线方式
@@ -498,58 +494,13 @@ void process_received_message(string mac, string id,const char* data, size_t len
//lnk20250708使用接口发送
time_t data_time = ConvertToTimestamp(avg_data.time);
std::vector<DataArrayItem> arr;
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
data_time, //数据转换出来的时间数据时标相对1970年的秒无效填入“-1”
0, //数据时标,微秒钟,无效填入“-1”
0, //数据标识1-标识数据异常
max_base64Str});
arr.push_back({2, data_time, 0, 0, min_base64Str});
arr.push_back({3, data_time, 0, 0, avg_base64Str});
arr.push_back({4, data_time, 0, 0, cp95_base64Str});
std::string js = generate_json(
normalize_mac(mac),
-1, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
1, //设备唯一标识Ldid填入0代表Ndid,后续根据商议决定填id还是数字
1, //报文处理的优先级1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
0x1302, //设备数据主动上送的数据类型
avg_data.name, //逻辑子设备ID0-逻辑设备本身,无填-1
0x04, //数据类型固定为电能质量
2, //数据属性无“0”、实时“1”、统计“2”等
1, //数据集序号(以数据集方式上送),无填-1
arr //数据数组
);
//std::cout << js << std::endl;
//// 创建输出流并打开文件(覆盖模式)
//std::ofstream outFile("json.txt"); // 等价于 std::ofstream outFile(filePath, std::ios::out);
//if (outFile.is_open()) { // 检查文件是否成功打开
// outFile << js; // 写入字符串
// outFile.close(); // 关闭文件
// // 成功提示(实际应用中建议使用日志)
//}
//else {
// // 错误处理:文件打开失败(如路径不存在)
//}
queue_data_t data;
data.monitor_no = avg_data.name; //监测点序号
data.strTopic = TOPIC_STAT;//统计topic
data.strText = js;
data.mp_id = ""; //监测点id暂时不需要
data.tag = G_ROCKETMQ_TAG; //统计tag
data.key = G_ROCKETMQ_KEY; //统计key
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
std::cout << "Successfully assembled tagPqData for line: "
<< avg_data.name << std::endl;
// 输出结果
//std::cout << "Base64 Encoded Data (" << max_data.CalculateFloatCount()
// << " floats): " << base64Str << std::endl;
enqueue_stat_pq(max_base64Str,
min_base64Str,
avg_base64Str,
cp95_base64Str,
data_time,
mac,
avg_data.name);
}
}
@@ -685,41 +636,8 @@ void process_received_message(string mac, string id,const char* data, size_t len
strScale,
nPTType);
std::string base64 = realdata.ConvertToBase64(nPTType);
//std::cout << base64 << std::endl;
//lnk实时数据使用接口发送20250711
time_t data_time = ConvertToTimestamp(realdata.time);
std::vector<DataArrayItem> arr;
arr.push_back({1, //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
data_time, //数据转换出来的时间数据时标相对1970年的秒无效填入“-1”
0, //数据时标,微秒钟,无效填入“-1”
0, //数据标识1-标识数据异常
base64});
std::string js = generate_json(
normalize_mac(mac),
-1, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
1, //设备唯一标识Ldid填入0代表Ndid,后续根据商议决定填id还是数字
1, //报文处理的优先级1 I类紧急请求/响应 2 II类紧急请求/响应 3 普通请求/响应 4 广播报文
0x1302, //设备数据主动上送的数据类型
cid, //逻辑子设备ID0-逻辑设备本身,无填-1
0x04, //数据类型固定为电能质量数据
1, //数据属性无“0”、实时“1”、统计“2”等
2, //数据集序号(以数据集方式上送),无填-1
arr //数据数组
);
//std::cout << js << std::en
queue_data_t data;
data.monitor_no = 1; //上送的实时数据没有测点序号统一填1
data.strTopic = TOPIC_RTDATA; //实时topic
data.strText = js;
data.mp_id = ""; //监测点id暂时不需要
data.tag = G_RT_TAG; //实时tag
data.key = G_RT_KEY; //实时key
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
// 转换为Base64字符串并发送lnk20250924
enqueue_realtime_pq(realdata, nPTType, cid, mac, id);
// 处理完成后重置状态
ClientManager::instance().change_device_state(id, DeviceState::IDLE);

View File

@@ -1,6 +1,6 @@
# 文件名runtime.cf
# 执行程序路径 ^ 可执行程序名 启动参数 ^ 重启特有参数 ^ 首次启动特有参数 ^ 程序首次启动前的延时秒数 ^ 重启是否忽略 ^ 首次启动是否忽略
/home/pq/zwproject/LFtid1056/bin/ ^ cloud-front-test -s 3_3^ ^ ^ 1 ^ ^
/home/pq/zwproject/LFtid1056/bin/ ^ cloud-front-test -s 2_3^ ^ ^ 1 ^ ^
/home/pq/zwproject/LFtid1056/bin/ ^ cloud-front-test -s 1_3^ ^ ^ 1 ^ ^
/home/pq/zwproject/LFtid1056/bin/ ^ fe_watchdog -m 8192 ^ ^ ^ 1 ^ IGNORE_RESTART ^
/FeProject/bin/ ^ cloud-front-test -s 3_3^ ^ ^ 1 ^ ^
/FeProject/bin/ ^ cloud-front-test -s 2_3^ ^ ^ 1 ^ ^
/FeProject/bin/ ^ cloud-front-test -s 1_3^ ^ ^ 1 ^ ^
/FeProject/bin/ ^ fe_watchdog -m 18192 ^ ^ ^ 1 ^ IGNORE_RESTART ^

BIN
LFtid1056_bak.rar Normal file

Binary file not shown.