runing with ledger interface

This commit is contained in:
lnk
2025-08-08 11:16:38 +08:00
parent 83986c35cb
commit 16ccb567d0
8 changed files with 236 additions and 71 deletions

View File

@@ -2750,6 +2750,7 @@ void to_json(nlohmann::json& j, const MsgObj& m) {
// FullObj to_json
void to_json(nlohmann::json& j, const FullObj& f) {
j = nlohmann::json{
{"Id", f.mac},
{"Mid", f.Mid},
{"Did", f.Did},
{"Pri", f.Pri},
@@ -2758,6 +2759,7 @@ void to_json(nlohmann::json& j, const FullObj& f) {
};
}
std::string generate_json(
const std::string mac,
int Mid, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
int Did, //设备唯一标识Ldid填入0代表Ndid。
int Pri, //报文处理的优先级
@@ -2769,6 +2771,7 @@ std::string generate_json(
const std::vector<DataArrayItem>& dataArray //数据数组。
) {
FullObj fobj;
fobj.mac = mac;
fobj.Mid = Mid;
fobj.Did = Did;
fobj.Pri = Pri;
@@ -2792,7 +2795,7 @@ void upload_data_test(){
arr.push_back({2, 1691741340, 0, 1, "yyyy"});
std::string js = generate_json(
-1, 2, 1, 4866, 1, 0, 2, 1, arr
"123",-1, 1, 1, 4866, 1, 0, 2, 1, arr
);
std::cout << js << std::endl;
@@ -2814,8 +2817,9 @@ std::vector<DeviceInfo> GenerateDeviceInfoFromLedger(const std::vector<terminal_
DeviceInfo device;
device.device_id = terminal.terminal_id;
device.name = terminal.terminal_name;
device.model = terminal.dev_series;
device.mac = terminal.mac;
device.model = terminal.dev_type;
device.mac = terminal.addr_str;
device.status = 1;
for (const auto& monitor : terminal.line) {
PointInfo point;
@@ -2826,6 +2830,9 @@ std::vector<DeviceInfo> GenerateDeviceInfoFromLedger(const std::vector<terminal_
point.PT2 = monitor.PT2;
point.CT1 = monitor.CT1;
point.CT2 = monitor.CT2;
point.strScale = monitor.voltage_level;
point.nCpuNo = std::stoi(monitor.logical_device_seq);
point.nPTType = std::stoi(monitor.terminal_connect);
device.points.push_back(point);
}
@@ -2882,7 +2889,7 @@ bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vect
////////////////////////////////////////////////////////////////////////////////////////////////////////////////下载成功通知
//提取下载路径的文件名
std::string extract_filename(const std::string& path) {
std::string extract_filename1(const std::string& path) {
size_t pos = path.find_last_of("/\\");
return (pos != std::string::npos) ? path.substr(pos + 1) : path;
}
@@ -2946,7 +2953,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std::
std::lock_guard<std::mutex> lock(ledgermtx);
// 去除 mac 路径前缀,仅保留文件名
std::string filename = extract_filename(filename_with_mac);
std::string filename = extract_filename1(filename_with_mac);
// 提取逻辑序号(如 PQM1 → 1
size_t under_pos = filename.find('_');
@@ -2992,7 +2999,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std::
std::set<std::string> s_name(qfile.file_name.begin(), qfile.file_name.end());
std::set<std::string> s_down;
for (const auto& path : qfile.file_download) {
s_down.insert(extract_filename(path)); // 提取每个路径中的文件名
s_down.insert(extract_filename1(path)); // 提取每个路径中的文件名
}
// 检查 file_download 是否与 file_name 完全一致(集合相同)
@@ -3001,7 +3008,7 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std::
// 找到其中的 .cfg 文件进行匹配
for (const auto& fpath : qfile.file_download) {
std::string fname = extract_filename(fpath);
std::string fname = extract_filename1(fpath);
if (fname.size() >= 4 && fname.substr(fname.size() - 4) == ".cfg") {
// 提取文件时标和监测点事件的时标匹配
qvvr_data matched;
@@ -3064,3 +3071,10 @@ bool update_qvvr_file_download(const std::string& filename_with_mac, const std::
}
return false; // 未匹配到终端ID或逻辑序号对应的监测点
}
////////////////////////////////////////////////////////////////////////////////////////提取mac
std::string normalize_mac(const std::string& mac) {
std::string result = mac;
result.erase(std::remove(result.begin(), result.end(), '-'), result.end());
return result;
}

View File

@@ -609,27 +609,27 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
dev.terminal_id = safe_str(item, "id");
dev.addr_str = safe_str(item, "ip");
dev.terminal_name = safe_str(item, "name");
dev.org_name = safe_str(item, "org_name");
dev.maint_name = safe_str(item, "maint_name");
dev.station_name = safe_str(item, "stationName");
dev.tmnl_factory = safe_str(item, "manufacturer");
dev.tmnl_status = safe_str(item, "status");
//dev.org_name = safe_str(item, "org_name");
//dev.maint_name = safe_str(item, "maint_name");
//dev.station_name = safe_str(item, "stationName");
//dev.tmnl_factory = safe_str(item, "manufacturer");
//dev.tmnl_status = safe_str(item, "status");
dev.dev_type = safe_str(item, "devType");
dev.dev_key = safe_str(item, "devKey");
dev.dev_series = safe_str(item, "series");
dev.port = safe_str(item, "port");
dev.timestamp = safe_str(item, "updateTime");
dev.processNo = safe_str(item, "processNo");
dev.maxProcessNum = safe_str(item, "maxProcessNum");
//dev.dev_key = safe_str(item, "devKey");
//dev.dev_series = safe_str(item, "series");
//dev.port = safe_str(item, "port");
//dev.timestamp = safe_str(item, "updateTime");
dev.processNo = safe_str(item, "node");
//dev.maxProcessNum = safe_str(item, "maxProcessNum");
dev.mac = safe_str(item, "mac");//添加mac
//dev.mac = safe_str(item, "mac");//添加mac
if (item.contains("monitorData") && item["monitorData"].is_array()) {
for (auto& mon : item["monitorData"]) {
if (dev.line.size() >= 10) break;
ledger_monitor m;
m.monitor_id = safe_str(mon, "id");
m.terminal_id = safe_str(mon, "terminal_id");
m.terminal_id = safe_str(mon, "deviceId");
m.monitor_name = safe_str(mon, "name");
m.logical_device_seq = safe_str(mon, "lineNo");
m.voltage_level = safe_str(mon, "voltageLevel");
@@ -637,10 +637,10 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
m.timestamp = safe_str(mon, "updateTime");
m.status = safe_str(mon, "status");
m.CT1 = mon.value("CT1", 0.0);
m.CT2 = mon.value("CT2", 0.0);
m.PT1 = mon.value("PT1", 0.0);
m.PT2 = mon.value("PT2", 0.0);
m.CT1 = mon.value("ct1", 0.0);
m.CT2 = mon.value("ct2", 0.0);
m.PT1 = mon.value("pt1", 0.0);
m.PT2 = mon.value("pt2", 0.0);
dev.line.push_back(m);
}
@@ -736,9 +736,9 @@ int parse_device_cfg_web()
if (IED_COUNT < count_cfg) {
std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl;
DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT);
//DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT);
} else {
DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT);
//DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT);
}
///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map后续根据协议台账修改
@@ -1199,21 +1199,21 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id,
// 有效响应,略过
} catch (...) {
// 响应异常,保存 json
DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口响应异常,无法上送监测点%s的暂态事件", monitor_id.c_str());
DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口响应异常,无法上送装置%s监测点%s的暂态事件",dev_id, monitor_id);
std::cout << "qvvr send fail ,store in local" << std::endl;
std::string qvvrDir = FRONT_PATH + "/dat/qvvr/";
std::string fileName = qvvrDir + monitor_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt";
std::string fileName = qvvrDir + dev_id + "-" + std::to_string(monitor_id) + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt";
writeJsonToFile(fileName, json_string);
checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024);
}
} else {
// 无响应,保存 json
DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口无响应,无法上送监测点%s的暂态事件", monitor_id.c_str());
DIY_ERRORLOG(full_key_m_d.c_str(), "【ERROR】暂态接口无响应,无法上送装置%s监测点%s的暂态事件",dev_id, monitor_id);
std::cout << "qvvr send fail ,store in local" << std::endl;
std::string qvvrDir = FRONT_PATH + "/dat/qvvr/";
std::string fileName = qvvrDir + monitor_id + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt";
std::string fileName = qvvrDir + dev_id + "-" + std::to_string(monitor_id) + "-" + FormatTimeForFilename(start_time_str) + "-" + std::to_string(dis_kind) + ".txt";
writeJsonToFile(fileName, json_string);
checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024);
return 1;
@@ -1234,7 +1234,9 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id,
void qvvr_test()
{
transfer_json_qvvr_data("qvvrtest123", 6,
10.98, 1234, 1754566628692, 1,1,
"testwavepath");
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////通用接口响应

View File

@@ -429,6 +429,7 @@ struct MsgObj {
// 整体
struct FullObj {
std::string mac;
int Mid;
int Did;
int Pri;
@@ -446,6 +447,7 @@ void to_json(nlohmann::json& j, const FullObj& f);
std::vector<DeviceInfo> GenerateDeviceInfoFromLedger(const std::vector<terminal_dev>& terminal_devlist);//接口读取台账后,再调用这个将台账拷贝过来
std::string generate_json( //构造装置主动上送数据的报文
const std::string mac,
int Mid, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
int Did, //设备唯一标识Ldid填入0代表Ndid。
int Pri, //报文处理的优先级
@@ -461,6 +463,8 @@ std::string generate_json( //构造装置主动上送数据的报文
int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id,
double mag, double dur, long long start_tm, int dis_kind,int phase,
const std::string& wavepath);
//录波文件上传接口
void SOEFileWeb(std::string& localpath,std::string& cloudpath, std::string& wavepath);
//录波文件目录接口
bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vector<std::string>& file_list_raw);
@@ -468,6 +472,9 @@ bool assign_qvvr_file_list(const std::string& id, ushort nCpuNo, const std::vect
//录波文件下载完成通知接口
bool update_qvvr_file_download(const std::string& filename_with_mac, const std::string& terminal_id);
//提取mac
std::string normalize_mac(const std::string& mac);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#endif

View File

@@ -334,8 +334,8 @@ void init_loggers_bydevid(const std::string& dev_id)
const ledger_monitor& monitor = term.line[j];
if (!monitor.monitor_id.empty()) {
std::ostringstream mon_key_c, mon_key_d, mon_path, mon_name;
mon_key_c << "monitor." << monitor.monitor_id << ".COM";
mon_key_d << "monitor." << monitor.monitor_id << ".DATA";
mon_key_c << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".COM";
mon_key_d << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".DATA";
mon_path << device_dir << "/monitor" << j;
mon_name << monitor.monitor_id;
@@ -355,7 +355,7 @@ void init_loggers_bydevid(const std::string& dev_id)
logger_map[mon_key_c.str()] = TypedLogger(mon_logger_c, LOGTYPE_COM);
logger_map[mon_key_d.str()] = TypedLogger(mon_logger_d, LOGTYPE_DATA);
DIY_WARNLOG(mon_key_d.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.monitor_id.c_str());
DIY_WARNLOG(mon_key_d.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str());
}
}
}
@@ -406,8 +406,8 @@ void init_loggers()
if (!monitor.monitor_id.empty()) {
std::ostringstream mon_key_c, mon_key_d, mon_path, mon_name;
mon_key_c << "monitor." << monitor.monitor_id << ".COM";
mon_key_d << "monitor." << monitor.monitor_id << ".DATA";
mon_key_c << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".COM";
mon_key_d << "monitor." << term.terminal_id << "." << monitor.logical_device_seq << ".DATA";
mon_path << device_dir << "/monitor" << i; // 用monitor+序号作为目录
mon_name << monitor.monitor_id;
@@ -425,7 +425,7 @@ void init_loggers()
logger_map[mon_key_d.str()] = TypedLogger(mon_logger_d, LOGTYPE_DATA);
DIY_WARNLOG(mon_key_d.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕",
monitor.monitor_name.c_str(), monitor.monitor_id.c_str());
monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str());
}
}
}

View File

@@ -215,7 +215,7 @@ std::string get_parent_directory() {
init_loggers();
//读取模型,下载模板文件
parse_model_cfg_web();
//parse_model_cfg_web();
//解析模板文件
//Set_xml_nodeinfo();
@@ -518,16 +518,35 @@ void Front::mqproducerThread()
extern thread_info_t thread_info[THREAD_CONNECTIONS];
void cleanup_args(ThreadArgs* args) {
for (int i = 0; i < args->argc; ++i) {
free(args->argv[i]); // strdup 分配的
}
delete[] args->argv;
delete args;
}
void* cloudfrontthread(void* arg) {
///////////////////////////////////////
ThreadArgs* args = static_cast<ThreadArgs*>(arg);
int argc = args->argc;
char **argv = args->argv;
printf("argc = %d, argv[0] = %s\n", argc, argv[0]);
printf("[cloudfrontthread] argc = %d\n", argc);
for (int i = 0; i < argc; ++i) {
printf(" argv[%d] = %s\n", i, argv[i]);
}
//添加线程处理
int index = *(int*)argv[0];
// 动态解析线程 index
int index = 0;
if (argc > 0 && argv[0]) {
try {
index = std::stoi(argv[0]);
} catch (...) {
std::cerr << "[cloudfrontthread] Failed to parse index from argv[0]: " << argv[0] << "\n";
return nullptr;
}
}
// 更新线程状态为运行中
pthread_mutex_lock(&thread_info[index].lock);
@@ -539,11 +558,12 @@ void* cloudfrontthread(void* arg) {
// 解析命令行参数
if(!parse_param(argc,argv)){
std::cerr << "process param error,exit" << std::endl;
cleanup_args(args);
return nullptr;
}
// 线程使用完后清理参数
delete args;
cleanup_args(args);
//路径获取
FRONT_PATH = get_parent_directory();