Merge branch '测试2' of http://192.168.1.22:3000/zhangwen/front_linux into 测试2
This commit is contained in:
BIN
LFtid1056.rar
BIN
LFtid1056.rar
Binary file not shown.
@@ -11,6 +11,8 @@
|
||||
#include <algorithm>
|
||||
#include <interface.h>
|
||||
|
||||
#include "cloudfront/code/log4.h"
|
||||
|
||||
// 配置参数
|
||||
constexpr int BASE_RECONNECT_DELAY = 20000; // 基础重连延迟(ms)
|
||||
constexpr int MAX_RECONNECT_DELAY = 60000; // 最大重连延迟(ms)
|
||||
|
||||
@@ -566,7 +566,7 @@ void init_config() {
|
||||
std::cout << "this is multiple process of index:" << g_front_seg_index << std::endl;
|
||||
|
||||
if(g_front_seg_index > g_front_seg_num){
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号应该为1到配置的最大进程号范围内的整数,退出当前进程");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出");
|
||||
exit(-1039);
|
||||
}
|
||||
|
||||
@@ -576,22 +576,10 @@ void init_config() {
|
||||
std::cout << "this is single process" << std::endl;
|
||||
}
|
||||
else{
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号应该为1到配置的最大进程号范围内的整数,退出当前进程");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"进程号参数异常,当前进程退出");
|
||||
exit(-1039);
|
||||
}
|
||||
|
||||
//测试进程端口
|
||||
/*if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
|
||||
TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
|
||||
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
|
||||
TEST_PORT = TEST_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
|
||||
}
|
||||
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集
|
||||
TEST_PORT = TEST_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index;
|
||||
}
|
||||
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
|
||||
TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
|
||||
}*/
|
||||
//测试端口处理
|
||||
TEST_PORT = TEST_PORT + g_front_seg_index;
|
||||
}
|
||||
|
||||
@@ -1232,23 +1220,20 @@ int recall_json_handle_from_mq(const std::string& body)
|
||||
} catch (const std::exception& e) {
|
||||
std::cerr << "Error parsing JSON: " << e.what() << std::endl;
|
||||
// ★与原逻辑等价:无法解析,不再进入 recall_json_handle
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的补招触发消息失败",
|
||||
G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"无法解析补招信息,补招触发失败");
|
||||
return 10004;
|
||||
}
|
||||
|
||||
// 提取 "messageBody"(字符串)
|
||||
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
|
||||
std::cerr << "'messageBody' is missing or is not a string" << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的补招触发消息失败",
|
||||
G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"无法解析补招信息,补招触发失败");
|
||||
return 10004;
|
||||
}
|
||||
std::string messageBodyStr = root["messageBody"].get<std::string>();
|
||||
if (messageBodyStr.empty()) {
|
||||
std::cerr << "'messageBody' is empty" << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的补招触发消息失败",
|
||||
G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"无法解析补招信息,补招触发失败");
|
||||
return 10004;
|
||||
}
|
||||
|
||||
@@ -1258,8 +1243,7 @@ int recall_json_handle_from_mq(const std::string& body)
|
||||
mb = nlohmann::json::parse(messageBodyStr);
|
||||
} catch (const std::exception& e) {
|
||||
std::cerr << "Failed to parse 'messageBody' JSON: " << e.what() << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的补招触发消息失败",
|
||||
G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"无法解析补招信息,补招触发失败");
|
||||
return 10004;
|
||||
}
|
||||
|
||||
@@ -5823,7 +5807,7 @@ void on_device_response_minimal(int response_code,
|
||||
<< " rc=" << response_code << " recall_status=" << front.recall_status<< std::endl; //错误响应码
|
||||
|
||||
//记录日志
|
||||
DIY_ERRORLOG_CODE(matched_monitor->monitor_id.c_str(),2,static_cast<int>(LogCode::LOG_CODE_RECALL),"监测点:%s 补招数据失败 - 失败时间点:%s 至 %s",matched_monitor->monitor_name.c_str(),front.StartTime.c_str(),front.EndTime.c_str());
|
||||
DIY_ERRORLOG_CODE(matched_monitor->monitor_id.c_str(),2,LOG_CODE_RECALL,"补招数据失败,失败时间点:%s 至 %s",front.StartTime.c_str(),front.EndTime.c_str());
|
||||
}
|
||||
updated = true;
|
||||
} else { //首条不是 RUNNING 状态,不应该收到这条响应
|
||||
|
||||
@@ -147,14 +147,14 @@ void handleUploadResponse(const std::string& response, std::string& wavepath) {
|
||||
}
|
||||
catch (const json::parse_error& e) {
|
||||
std::cerr << "Error parsing response: " << e.what() << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "前置上传文件失败,web响应异常");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "文件上传接口响应异常,上传文件失败");
|
||||
return;
|
||||
}
|
||||
|
||||
// 提取字段
|
||||
if (!json_data.contains("code") || !json_data.contains("data")) {
|
||||
std::cerr << "Error: Missing expected fields in JSON response." << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "前置上传文件失败,web响应异常");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "文件上传接口响应异常,上传文件失败");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ void handleUploadResponse(const std::string& response, std::string& wavepath) {
|
||||
auto& data = json_data["data"];
|
||||
if (!data.contains("name") || !data.contains("fileName") || !data.contains("url")) {
|
||||
std::cerr << "Error: Missing expected fields in JSON data object." << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "前置上传文件失败,web响应异常");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "文件上传接口响应异常,上传文件失败");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -195,7 +195,7 @@ void handleUploadResponse(const std::string& response, std::string& wavepath) {
|
||||
wavepath = nameWithoutExt;
|
||||
|
||||
std::cout << "wavepath: " << wavepath << std::endl;
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_FILE, "前置上传文件成功,远端文件名:%s", wavepath.c_str());
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_FILE, "上传文件成功,远端文件名:%s", wavepath.c_str());
|
||||
}
|
||||
|
||||
//上传文件
|
||||
@@ -271,7 +271,7 @@ void SendFileWeb(const std::string& strUrl, const std::string& localpath, const
|
||||
if (res != CURLE_OK) {
|
||||
const char* em = errbuf[0] ? errbuf : curl_easy_strerror(res);
|
||||
std::cerr << "http web failed: " << em << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG, "前置上传文件 %s 失败,请检查文件上传接口配置",localpath.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG, "通过文件接口上传文件 %s 失败",localpath.c_str());
|
||||
} else {
|
||||
std::cout << "http web success, response: " << resPost0 << std::endl;
|
||||
handleUploadResponse(resPost0, wavepath); // 处理响应
|
||||
@@ -550,7 +550,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
||||
{
|
||||
if (inputstring.empty()) {
|
||||
std::cerr << "Error: inputstring is empty\n";
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"台账接口的入参为空");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG,"调用台账接口的入参为空,无法获取台账信息");
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -569,7 +569,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
||||
break;
|
||||
}
|
||||
std::cerr << "data 无效或为空数组,重试\n";
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"前置从web接口中获取的台账信息为空或者无效信息无法解析,请核对接口参数和前置使用的入参信息:%s",inputparm.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"从台账接口中获取到的台账信息为无效信息,台账初始化失败");
|
||||
} catch (const nlohmann::json::parse_error& e) {
|
||||
std::cerr << "parse error: " << e.what() << ", retrying...\n";
|
||||
}
|
||||
@@ -579,7 +579,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
||||
|
||||
if (++retry > 3) {
|
||||
std::cerr << "web error after 3 retry, fallback to local file\n";
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_CONFIG, "前置无法从接口获取台账,从本地读取上一次缓存的台账,请核对接口参数和前置使用的入参信息:%s", inputparm.c_str());
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_CONFIG, "无法从台账接口获取台账,将从本地读取上一次缓存的台账");
|
||||
std::string ledger = read_latest_ledger_file();
|
||||
if (!ledger.empty()) {
|
||||
try {
|
||||
@@ -587,7 +587,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
||||
if (json_data.contains("data") && json_data["data"].is_array() && !json_data["data"].empty()) {
|
||||
break;
|
||||
}
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "前置从本地台账中获取的台账信息为空或者无效信息无法解析,请核对接口参数和前置使用的入参信息:%s", inputparm.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"从本地台账中获取到的台账信息为无效信息,台账初始化失败");
|
||||
} catch (const nlohmann::json::parse_error& e) {
|
||||
std::cerr << "local parse error: " << e.what() << "\n";
|
||||
}
|
||||
@@ -601,7 +601,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
||||
|
||||
// 2. 安全读取 code/msg
|
||||
std::string code = json_data.value("code", "not found");
|
||||
std::string msg = json_data.value("msg", "not found");
|
||||
std::string msg = json_data.value("message", "not found");
|
||||
std::cout << "code: " << code << "\n";
|
||||
std::cout << "msg : " << msg << "\n";
|
||||
|
||||
@@ -631,12 +631,13 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
||||
//dev.station_name = safe_str(item, "stationName");
|
||||
//dev.tmnl_factory = safe_str(item, "manufacturer");
|
||||
//dev.tmnl_status = safe_str(item, "status");
|
||||
dev.dev_type = safe_str(item, "devType");
|
||||
dev.dev_type = safe_str(item, "devType");
|
||||
dev.DevLogLevel = safe_str(item, "devLogLevel");
|
||||
//dev.dev_key = safe_str(item, "devKey");
|
||||
//dev.dev_series = safe_str(item, "series");
|
||||
//dev.port = safe_str(item, "port");
|
||||
//dev.timestamp = safe_str(item, "updateTime");
|
||||
dev.Righttime = safe_str(item, "Righttime");
|
||||
dev.Righttime = safe_str(item, "rightTime");
|
||||
dev.processNo = safe_str(item, "node");
|
||||
dev.maxProcessNum = safe_str(item, "maxProcessNum");
|
||||
|
||||
@@ -652,6 +653,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
|
||||
m.logical_device_seq = safe_str(mon, "lineNo");
|
||||
m.voltage_level = safe_str(mon, "voltageLevel");
|
||||
m.terminal_connect = safe_str(mon, "ptType");
|
||||
m.LineLogLevel = safe_str(mon, "lineLogLevel");
|
||||
//m.timestamp = safe_str(mon, "updateTime");
|
||||
m.status = safe_str(mon, "status");
|
||||
|
||||
@@ -710,7 +712,7 @@ int parse_device_cfg_web()
|
||||
input_jstr += "}";
|
||||
|
||||
std::cout << "input_jstr: " << input_jstr << std::endl;
|
||||
DIY_DEBUGLOG_CODE("process",0,LOG_CODE_LEDGER,"前置获取台账使用的请求输入为:%s", input_jstr.c_str());
|
||||
DIY_DEBUGLOG_CODE("process",0,LOG_CODE_LEDGER,"台账接口输入:%s", input_jstr.c_str());
|
||||
|
||||
// 2. 调用接口
|
||||
std::map<std::string, terminal_dev> terminal_dev_map;
|
||||
@@ -739,10 +741,10 @@ int parse_device_cfg_web()
|
||||
|
||||
if (max_process_num != max_index) {
|
||||
if (max_process_num >= 1 && max_process_num <= 9) {
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_CONFIG, "前置比对台账获取的进程数:%d和本地配置的进程数:%d,不匹配,按照台账进程数重置前置的进程数量",max_process_num, max_index);
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_CONFIG, "从台账获取到的进程数:%d和原有的进程数:%d不一致,将按照从台账获取到的进程数重置进程",max_process_num, max_index);
|
||||
execute_bash("reset", max_process_num, "all");
|
||||
} else {
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG, "前置从台账获取的进程数:%d不符合范围1~9,按照本地配置进程数启动进程",max_process_num);
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_CONFIG, "从台账获取到的总进程数:%d不符合范围1~9,不会重置进程",max_process_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -750,7 +752,7 @@ int parse_device_cfg_web()
|
||||
// 5. 台账数量与配置比对
|
||||
int count_cfg = static_cast<int>(terminal_dev_map.size());
|
||||
std::cout << "terminal_ledger_num: " << count_cfg << std::endl;
|
||||
DIY_DEBUGLOG_CODE("process",0,LOG_CODE_LEDGER,"前置获取到的台账的数量为:%d",count_cfg);
|
||||
DIY_DEBUGLOG_CODE("process",0,LOG_CODE_LEDGER,"获取到台账数量:%d",count_cfg);
|
||||
|
||||
if (IED_COUNT < count_cfg) {
|
||||
std::cout << "!!!!!!!!!!single process has ledger count more than config!!!!!!!" << std::endl;
|
||||
@@ -802,13 +804,16 @@ int parse_device_cfg_web()
|
||||
terminal_devlist.push_back(dev);
|
||||
}
|
||||
|
||||
//记录所有logger等级
|
||||
refresh_log_level_cache_locked();
|
||||
|
||||
// 判断监测点接线类型
|
||||
for (auto& dev : terminal_devlist) {
|
||||
for (auto& mon : dev.line) {
|
||||
if (!mon.terminal_connect.empty() && mon.terminal_connect != "0") {
|
||||
isdelta_flag = 1;
|
||||
std::cout << "monitor_id " << mon.monitor_id<< " v_wiring_type: " << mon.terminal_connect << " is delta wiring: " << isdelta_flag << std::endl;
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_WIRETYPE,"装置:%s - 监测点: %s 是角形接线",dev.terminal_name.c_str(),mon.monitor_name.c_str());
|
||||
//DIY_WARNLOG_CODE("process",0,LOG_CODE_WIRETYPE,"装置:%s - 监测点: %s 是角形接线",dev.terminal_name.c_str(),mon.monitor_name.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1055,7 +1060,7 @@ static void writeJsonToFile(const std::string& filePath, const std::string& json
|
||||
{
|
||||
FILE* fp = fopen(filePath.c_str(), "w");
|
||||
if (!fp) {
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "前置无法将暂态事件写入本地缓存");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "无法将暂态事件写入本地缓存,暂态事件保存失败");
|
||||
std::cerr << "Failed to write in file : " << filePath << std::endl;
|
||||
return;
|
||||
}
|
||||
@@ -1167,7 +1172,7 @@ static void scanAndResendOfflineFiles(const std::string& dirPath)
|
||||
// 读取 JSON 文件内容
|
||||
std::ifstream inFile(file.fileName.c_str());
|
||||
if (!inFile) {
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "无法打开本地缓存的暂态事件");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "无法打开本地缓存的暂态事件,暂态事件重发失败");
|
||||
std::cerr << "fail to open existing file: " << file.fileName << std::endl;
|
||||
continue;
|
||||
}
|
||||
@@ -1186,14 +1191,14 @@ static void scanAndResendOfflineFiles(const std::string& dirPath)
|
||||
try {
|
||||
json j_r = json::parse(response);
|
||||
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "前置重发暂态事件成功");
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "重发暂态事件成功");
|
||||
std::cout << "old file send success, remove it" << std::endl;
|
||||
|
||||
std::remove(file.fileName.c_str());
|
||||
} catch (...) {
|
||||
std::cout << "old file send fail (response parse failed)" << std::endl;
|
||||
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "前置重发暂态事件失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM, "暂态接口响应异常,重发暂态事件失败");
|
||||
handleCommentResponse(response); // 仍然处理文本响应
|
||||
}
|
||||
} else {
|
||||
@@ -1289,10 +1294,28 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id,
|
||||
if (!response.empty()) {
|
||||
try {
|
||||
json j_r = json::parse(response);
|
||||
// 有效响应,略过
|
||||
// =====【新增:业务失败也当异常处理】=====
|
||||
if (j_r.contains("code")
|
||||
&& j_r["code"].is_string()
|
||||
&& j_r["code"].get<std::string>() != "A0000") {
|
||||
|
||||
DIY_ERRORLOG_CODE(mpid.c_str(),2,LOG_CODE_TRANSIENT_COMM,"暂态接口业务失败(code=%s),无法上送暂态事件",j_r["code"].get<std::string>().c_str());
|
||||
|
||||
std::cout << "qvvr send fail ,store in local" << std::endl;
|
||||
std::string qvvrDir = FRONT_PATH + "/dat/qvvr/";
|
||||
std::string fileName = qvvrDir + dev_id + "-" +
|
||||
std::to_string(monitor_id) + "-" +
|
||||
FormatTimeForFilename(start_time_str) + "-" +
|
||||
std::to_string(dis_kind) + ".txt";
|
||||
|
||||
writeJsonToFile(fileName, json_string);
|
||||
checkAndRemoveOldestIfNeeded(qvvrDir, 10LL * 1024 * 1024);
|
||||
}
|
||||
// =====【新增结束】=====
|
||||
|
||||
} catch (...) {
|
||||
// 响应异常,保存 json
|
||||
DIY_ERRORLOG_CODE(mpid.c_str(),2,LOG_CODE_TRANSIENT_COMM, "暂态接口响应异常,无法上送装置%s - 监测点%s的暂态事件",showName_d, showName_m);
|
||||
DIY_ERRORLOG_CODE(mpid.c_str(),2,LOG_CODE_JSON, "暂态接口响应异常,无法上送暂态事件");
|
||||
|
||||
std::cout << "qvvr send fail ,store in local" << std::endl;
|
||||
std::string qvvrDir = FRONT_PATH + "/dat/qvvr/";
|
||||
@@ -1302,7 +1325,7 @@ int transfer_json_qvvr_data(const std::string& dev_id, ushort monitor_id,
|
||||
}
|
||||
} else {
|
||||
// 无响应,保存 json
|
||||
DIY_ERRORLOG_CODE(mpid.c_str(),2,LOG_CODE_TRANSIENT_COMM,"暂态接口无响应,无法上送装置%s - 监测点%s的暂态事件",showName_d, showName_m);
|
||||
DIY_ERRORLOG_CODE(mpid.c_str(),2,LOG_CODE_TRANSIENT_COMM,"暂态接口无响应,无法上送暂态事件");
|
||||
|
||||
std::cout << "qvvr send fail ,store in local" << std::endl;
|
||||
std::string qvvrDir = FRONT_PATH + "/dat/qvvr/";
|
||||
|
||||
@@ -204,6 +204,7 @@ public:
|
||||
std::string terminal_connect; //监测点接线方式
|
||||
std::string timestamp; //更新时间
|
||||
std::string status; //监测点状态
|
||||
std::string LineLogLevel; //监测点日志级别
|
||||
double PT1; // 电压变比1
|
||||
double PT2; // 电压变比2
|
||||
double CT1; // 电流变比1
|
||||
@@ -217,6 +218,7 @@ class update_dev
|
||||
public:
|
||||
std::string guid; // ★新增:供发送回复使用
|
||||
|
||||
std::string DevLogLevel; //装置日志级别
|
||||
std::string terminal_id;
|
||||
std::string terminal_name;
|
||||
std::string org_name;
|
||||
@@ -250,6 +252,7 @@ public:
|
||||
std::string logical_device_seq; //监测点序号
|
||||
std::string voltage_level; //监测点电压等级
|
||||
std::string terminal_connect; //监测点接线方式
|
||||
std::string LineLogLevel; //监测点日志级别
|
||||
std::string timestamp; //更新时间
|
||||
std::string status; //监测点状态
|
||||
double PT1; // 电压变比1
|
||||
@@ -286,6 +289,8 @@ public:
|
||||
std::vector<NameFixValue> dz_internal_info_list; //内部定值信息列表
|
||||
std::vector<DZ_kzz_bit> control_words;
|
||||
|
||||
std::string DevLogLevel; //装置日志级别
|
||||
|
||||
std::string terminal_id;
|
||||
std::string terminal_name;
|
||||
std::string org_name;
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#include <fnmatch.h>
|
||||
#include <unordered_map>
|
||||
#include <chrono>
|
||||
|
||||
#include <memory>
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#include "log4cplus/logger.h"
|
||||
#include "log4cplus/configurator.h"
|
||||
@@ -58,9 +58,12 @@ static const char* ID_WILDCARD = "all"; // id 通配字段
|
||||
|
||||
std::map<std::string, TypedLogger> logger_map;
|
||||
DebugSwitch g_debug_switch;
|
||||
/////////////////////////////////////////////////
|
||||
|
||||
|
||||
|
||||
// 原子指针:append 线程只读它,不加锁
|
||||
std::shared_ptr<LogLevelCache> g_level_cache_sp;
|
||||
///////////////////////////////////////////////////////////////
|
||||
//用来控制日志上送的结构
|
||||
struct LOGEntry {
|
||||
std::string id; //测点和装置需要的id
|
||||
@@ -122,6 +125,26 @@ std::string get_level_str(int level) {
|
||||
default: return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
const char* loglevel_to_str(int lv) {
|
||||
switch (lv) {
|
||||
case DEBUG_LOG_LEVEL: return "DEBUG";
|
||||
case INFO_LOG_LEVEL: return "NORMAL";
|
||||
case WARN_LOG_LEVEL: return "WARN";
|
||||
case ERROR_LOG_LEVEL: return "ERROR";
|
||||
default: return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
static int str_to_loglevel(const std::string& s, int default_level = WARN_LOG_LEVEL)
|
||||
{
|
||||
if (s == "DEBUG") return DEBUG_LOG_LEVEL;
|
||||
if (s == "NORMAL") return INFO_LOG_LEVEL; // NORMAL 当 INFO
|
||||
if (s == "INFO") return INFO_LOG_LEVEL;
|
||||
if (s == "WARN") return WARN_LOG_LEVEL;
|
||||
if (s == "ERROR") return ERROR_LOG_LEVEL;
|
||||
return default_level; // 空/非法都兜底
|
||||
}
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
TypedLogger::TypedLogger() {}
|
||||
TypedLogger::TypedLogger(const Logger& l, int t) : logger(l), logtype(t) {}
|
||||
@@ -149,77 +172,36 @@ bool DebugSwitch::match(const std::string& logger_name, int level, int logtype)
|
||||
}
|
||||
return false;
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////////
|
||||
static LogLevelCache* build_cache_unlocked()
|
||||
{
|
||||
LogLevelCache* nc = new LogLevelCache;
|
||||
nc->term_min.reserve(terminal_devlist.size());
|
||||
|
||||
for (const terminal_dev& t : terminal_devlist) {
|
||||
const int t_lv = str_to_loglevel(t.DevLogLevel, WARN_LOG_LEVEL);
|
||||
|
||||
if (!t.terminal_id.empty())
|
||||
nc->term_min[t.terminal_id] = t_lv;
|
||||
|
||||
/*class SendAppender : public Appender {
|
||||
protected:
|
||||
void append(const spi::InternalLoggingEvent& event) {
|
||||
std::string logger_name = event.getLoggerName();
|
||||
int level = event.getLogLevel();
|
||||
std::string msg = event.getMessage();
|
||||
for (const ledger_monitor& m : t.line) {
|
||||
if (m.monitor_id.empty()) continue;
|
||||
|
||||
std::string level_str;
|
||||
if (logger_name.find("process") == 0)
|
||||
level_str = "process";
|
||||
else if (logger_name.find("monitor") != std::string::npos)
|
||||
level_str = "measurepoint";
|
||||
else
|
||||
level_str = "terminal";
|
||||
|
||||
// ★读取 TLS 中的 code(在打日志的线程里由宏设定)
|
||||
int code = g_log_code_tls; // 若未显式传入,则为 0
|
||||
|
||||
if (level == ERROR_LOG_LEVEL || level == WARN_LOG_LEVEL || g_debug_switch.match(logger_name, level, logtype)) {
|
||||
std::ostringstream oss;
|
||||
oss << "{\"processNo\":\"" << std::to_string(g_front_seg_index)
|
||||
<< "\",\"nodeId\":\"" << FRONT_INST
|
||||
<< "\",\"businessId\":\"" << extract_logger_id(logger_name)
|
||||
<< "\",\"level\":\"" << level_str
|
||||
<< "\",\"time\":\"" << now_yyyy_mm_dd_hh_mm_ss()
|
||||
<< "\",\"grade\":\"" << get_level_str(level)
|
||||
// ★新增:输出 code 字段(整型)
|
||||
<< "\",\"code\":\"" << code
|
||||
<< "\",\"log\":\"" << escape_json(msg) << "\"}";
|
||||
|
||||
std::string jsonString = oss.str();
|
||||
|
||||
queue_data_t connect_info;
|
||||
connect_info.strTopic = G_LOG_TOPIC;
|
||||
connect_info.strText = jsonString;
|
||||
connect_info.tag = G_LOG_TAG;
|
||||
connect_info.key = G_LOG_KEY;
|
||||
|
||||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||||
queue_data_list.push_back(connect_info);
|
||||
// 监测点优先;空/非法自动兜底到终端 t_lv
|
||||
const int m_lv = str_to_loglevel(m.LineLogLevel, t_lv);
|
||||
nc->mp_min[m.monitor_id] = m_lv;
|
||||
}
|
||||
}
|
||||
return nc;
|
||||
}
|
||||
|
||||
std::string escape_json(const std::string& input) {
|
||||
std::ostringstream ss;
|
||||
for (unsigned int i = 0; i < input.size(); ++i) {
|
||||
switch (input[i]) {
|
||||
case '\\': ss << "\\\\"; break;
|
||||
case '"': ss << "\\\""; break;
|
||||
case '\n': ss << "\\n"; break;
|
||||
case '\r': ss << "\\r"; break;
|
||||
case '\t': ss << "\\t"; break;
|
||||
default: ss << input[i]; break;
|
||||
}
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
void refresh_log_level_cache_locked()
|
||||
{
|
||||
std::shared_ptr<LogLevelCache> nc(build_cache_unlocked());
|
||||
std::atomic_store_explicit(&g_level_cache_sp, nc, std::memory_order_release);
|
||||
}
|
||||
//////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual void close() {
|
||||
// 可空实现
|
||||
}
|
||||
|
||||
public:
|
||||
SendAppender() {}
|
||||
virtual ~SendAppender() {
|
||||
destructorImpl(); // 重要!释放 log4cplus 基类资源
|
||||
}
|
||||
};*/
|
||||
class SendAppender : public Appender {
|
||||
private:
|
||||
struct RateState {
|
||||
@@ -238,7 +220,7 @@ private:
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
// 前 5 次:1 秒一次;第 6 次起:300 秒一次
|
||||
// 前 3 次:1 秒一次;第 3 次起:300 秒一次,一小时恢复
|
||||
static bool should_emit(const std::string& key) {
|
||||
using namespace std::chrono;
|
||||
const auto now = steady_clock::now();
|
||||
@@ -285,25 +267,25 @@ private:
|
||||
int level_val) {//告警等级
|
||||
pthread_mutex_lock(&g_log_mutex);
|
||||
|
||||
// 1) 精确匹配:id + level + logtype
|
||||
// 1) 精确匹配:id + level + logtype //这个id指定日志种类指定级别的日志
|
||||
if (find_entry_allow(build_debug_key(id, level_str, logtype), level_val)) {
|
||||
pthread_mutex_unlock(&g_log_mutex);
|
||||
return true;
|
||||
}
|
||||
|
||||
// 2) logtype 通配:id + level + -1
|
||||
// 2) logtype 通配:id + level + -1 //这个id指定日志级别的所有日志
|
||||
if (find_entry_allow(build_debug_key(id, level_str, LOGTYPE_WILDCARD), level_val)) {
|
||||
pthread_mutex_unlock(&g_log_mutex);
|
||||
return true;
|
||||
}
|
||||
|
||||
// 3) id 通配:* + level + logtype
|
||||
// 3) id 通配:* + level + logtype //这个id的指定级别的日志
|
||||
if (find_entry_allow(build_debug_key(ID_WILDCARD, level_str, logtype), level_val)) {
|
||||
pthread_mutex_unlock(&g_log_mutex);
|
||||
return true;
|
||||
}
|
||||
|
||||
// 4) 双通配:* + level + -1
|
||||
// 4) 双通配:* + level + -1 //所有指定级别的日志,即根据台账的等级来上送所有日志
|
||||
if (find_entry_allow(build_debug_key(ID_WILDCARD, level_str, LOGTYPE_WILDCARD), level_val)) {
|
||||
pthread_mutex_unlock(&g_log_mutex);
|
||||
return true;
|
||||
@@ -313,7 +295,30 @@ private:
|
||||
return false;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////添加台账日志控制
|
||||
|
||||
static int get_min_send_level_cached(const std::string& level_str, const std::string& logger_name)
|
||||
{
|
||||
const int DEFAULT_LEVEL = WARN_LOG_LEVEL;
|
||||
if (level_str == "process") return DEFAULT_LEVEL;
|
||||
|
||||
const std::string id = extract_logger_id(logger_name);
|
||||
if (id.empty()) return DEFAULT_LEVEL;
|
||||
|
||||
std::shared_ptr<LogLevelCache> c =
|
||||
std::atomic_load_explicit(&g_level_cache_sp, std::memory_order_acquire);
|
||||
if (!c) return DEFAULT_LEVEL;
|
||||
|
||||
if (level_str == "terminal") {
|
||||
auto it = c->term_min.find(id);
|
||||
return (it != c->term_min.end()) ? it->second : DEFAULT_LEVEL;
|
||||
}
|
||||
if (level_str == "measurepoint") {
|
||||
auto it = c->mp_min.find(id);
|
||||
return (it != c->mp_min.end()) ? it->second : DEFAULT_LEVEL;
|
||||
}
|
||||
return DEFAULT_LEVEL;
|
||||
}
|
||||
|
||||
protected:
|
||||
void append(const spi::InternalLoggingEvent& event) override {
|
||||
@@ -336,7 +341,15 @@ protected:
|
||||
|
||||
bool allow_send = false;
|
||||
|
||||
if (level >= WARN_LOG_LEVEL) {
|
||||
int min_send_level = get_min_send_level_cached(level_str, logger_name);
|
||||
|
||||
std::cout << "[LOG] logger: " << logger_name
|
||||
<< ", level_str: " << level_str
|
||||
<< ", level: " << level
|
||||
<< ", min_send_level: " << min_send_level << std::endl;
|
||||
|
||||
// ① 高于“台账阈值”的日志:直接上送
|
||||
if (level >= min_send_level) {
|
||||
allow_send = true;
|
||||
} else {
|
||||
// NORMAL/DEBUG 默认不上送,必须命令打开
|
||||
@@ -526,7 +539,7 @@ void init_loggers_bydevid(const std::string& dev_id)
|
||||
// 添加判断:终端日志 logger 是否已存在
|
||||
if (logger_map.find(device_key) == logger_map.end()) {
|
||||
|
||||
// 所有终端日志(com 和 data)写到同一个 device 日志文件中
|
||||
// 所有终端日志写到同一个 device 日志文件中
|
||||
std::string file_path_t = device_dir + "/" + dev_id + ".log";
|
||||
|
||||
// 共用一个 appender 实例
|
||||
@@ -674,7 +687,11 @@ extern "C" {
|
||||
// 公共函数
|
||||
void log4_log_with_level(const char* key, const char* msg, int level) {
|
||||
std::map<std::string, TypedLogger>::iterator it = logger_map.find(key);
|
||||
if (it == logger_map.end()) return;
|
||||
if (it == logger_map.end()) {
|
||||
std::cout << "[LOG][MISS] logger not found, key="
|
||||
<< (key ? key : "NULL") << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
Logger logger = it->second.logger;
|
||||
switch (level) {
|
||||
|
||||
@@ -39,6 +39,15 @@ extern LOG_TLS_SPEC int g_log_code_tls;
|
||||
# define PRINTF_LIKE(fmt_index, first_arg)
|
||||
#endif
|
||||
/////////////////////////////////////////
|
||||
|
||||
struct LogLevelCache { //日志等级缓存
|
||||
// terminal_id -> min_level
|
||||
std::unordered_map<std::string, int> term_min;
|
||||
// monitor_id -> min_level
|
||||
std::unordered_map<std::string, int> mp_min;
|
||||
};
|
||||
|
||||
////////////////////////////////////
|
||||
struct TypedLogger {
|
||||
log4cplus::Logger logger;
|
||||
int logtype;
|
||||
@@ -67,9 +76,8 @@ extern DebugSwitch g_debug_switch;
|
||||
|
||||
extern void send_reply_to_queue(const std::string& guid, const int code, const std::string& result);
|
||||
|
||||
|
||||
//std::string get_front_type_from_subdir();
|
||||
|
||||
extern std::shared_ptr<LogLevelCache> g_level_cache_sp;
|
||||
const char* loglevel_to_str(int lv);
|
||||
|
||||
// 不带 Appender 的版本
|
||||
log4cplus::Logger init_logger(const std::string& full_name,
|
||||
@@ -87,6 +95,8 @@ void process_log_command(const std::string& id, const std::string& level, const
|
||||
|
||||
void update_log_entries_countdown();
|
||||
|
||||
void refresh_log_level_cache_locked();
|
||||
|
||||
extern "C" {
|
||||
#endif
|
||||
void remove_loggers_by_terminal_id(const std::string& terminal_id_cstr);
|
||||
|
||||
@@ -219,14 +219,14 @@ void RocketMQProducer::sendMessage(const std::string& body,
|
||||
<< std::endl;
|
||||
} catch (const MQClientException& e) {
|
||||
std::cerr << "[RocketMQProducer] 发送失败: " << e.what() << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ发送失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ客户端异常,mq消息发送失败");
|
||||
// 根据需要进行重试或日志记录
|
||||
} catch (const std::exception& e) {
|
||||
std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ发送失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ异常,mq消息发送失败");
|
||||
} catch (...) {
|
||||
std::cerr << "[RocketMQProducer] 未知错误,消息发送失败。" << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ发送失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ未知错误,mq消息发送失败");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,13 +247,13 @@ void RocketMQProducer::sendMessageOrderly(const std::string& body,
|
||||
<< std::endl;
|
||||
} catch (const MQClientException& e) {
|
||||
std::cerr << "[RocketMQProducer] 顺序发送失败: " << e.what() << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ发送失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ客户端异常,mq消息发送失败");
|
||||
} catch (const std::exception& e) {
|
||||
std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ发送失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ异常,mq消息发送失败");
|
||||
} catch (...) {
|
||||
std::cerr << "[RocketMQProducer] 未知错误,顺序消息发送失败。" << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ发送失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ未知错误,mq消息发送失败");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,7 +310,6 @@ void rocketmq_producer_send(rocketmq::RocketMQProducer* producer,
|
||||
producer->sendMessage(body, topic, tags, keys);
|
||||
} catch (const std::exception& e) {
|
||||
std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ, "MQ发送失败");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -520,8 +519,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
|
||||
|
||||
std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl;
|
||||
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_PROCESS_CONTROL, "收到主题:%s - tag:%s的进程控制消息",
|
||||
G_MQCONSUMER_TOPIC_SET.c_str(),FRONT_INST.c_str());
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_PROCESS_CONTROL, "收到进程控制消息,开始处理");
|
||||
|
||||
if (code_str == "set_process") {
|
||||
|
||||
@@ -553,8 +551,12 @@ bool parseJsonMessageSET(const std::string& json_str) {
|
||||
|
||||
execute_bash(fun, processNum, frontType);
|
||||
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_PROCESS_CONTROL, "执行指令:%s,reset表示重启所有进程,add表示添加进程",
|
||||
fun.c_str());
|
||||
if (fun == "reset") {
|
||||
DIY_WARNLOG_CODE("process", 0, LOG_CODE_PROCESS_CONTROL,"执行重置进程指令,更新多进程数:%d", processNum);
|
||||
}
|
||||
else if (fun == "add") {
|
||||
DIY_WARNLOG_CODE("process", 0, LOG_CODE_PROCESS_CONTROL,"执行添加进程指令,新增进程号:%d", processNum);
|
||||
}
|
||||
|
||||
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!");
|
||||
std::cout << "this msg should only execute once" << std::endl;
|
||||
@@ -570,7 +572,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
|
||||
// delete 分支:不要求 frontType/processNum
|
||||
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到删除进程指令,这个进程将会重启 ");
|
||||
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_PROCESS_CONTROL, "执行指令:%s,即将重启", fun.c_str());
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_PROCESS_CONTROL, "执行删除进程指令,当前进程即将重启");
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
::_exit(-1039); // 进程退出
|
||||
@@ -659,8 +661,7 @@ bool parseJsonMessageLOG(const std::string& json_str) {
|
||||
return true;
|
||||
}*/
|
||||
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_LOG_REQUEST,"收到主题:%s - tag:%s的日志控制消息",
|
||||
G_MQCONSUMER_TOPIC_LOG.c_str(),FRONT_INST.c_str());
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_LOG_REQUEST,"收到日志控制消息,开始处理");
|
||||
|
||||
std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl;
|
||||
/*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/
|
||||
@@ -680,7 +681,7 @@ bool parseJsonMessageLOG(const std::string& json_str) {
|
||||
process_log_command(id, level, grade, logtype);
|
||||
} else {
|
||||
std::cout << "type doesn't match" << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "日志控制指令失败");
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "无法解析日志控制指令,日志控制失败");
|
||||
}
|
||||
|
||||
std::cout << "this msg should only execute once" << std::endl;
|
||||
@@ -734,8 +735,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
|
||||
std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl;
|
||||
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_LEDGER_UPDATE,"收到主题:%s - tag:%s的台账更新消息",
|
||||
G_MQCONSUMER_TOPIC_UD.c_str(), FRONT_INST.c_str());
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_LEDGER_UPDATE,"收到台账更新消息,开始处理");
|
||||
|
||||
//send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到台账更新指令");
|
||||
std::vector<DeviceReply> reply_list;
|
||||
@@ -756,6 +756,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
//json_data.tmnl_factory = item.value("manufacturer", "");
|
||||
//json_data.tmnl_status = item.value("status", "");
|
||||
json_data.dev_type = item.value("devType", "");
|
||||
json_data.DevLogLevel = item.value("devLogLevel", "WARN");
|
||||
//json_data.dev_key = item.value("devKey", "");
|
||||
//json_data.dev_series = item.value("series", "");
|
||||
|
||||
@@ -772,7 +773,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
json_data.mac = item.value("ip", "");
|
||||
//json_data.port = item.value("port", "");
|
||||
//json_data.timestamp = item.value("updateTime", "");
|
||||
json_data.Righttime = item.value("Righttime", "");
|
||||
json_data.Righttime = item.value("rightTime", "");
|
||||
|
||||
if (item.contains("monitorData") && item["monitorData"].is_array()) {
|
||||
for (const auto& monitor_item : item["monitorData"]) {
|
||||
@@ -783,6 +784,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
m.monitor_name = monitor_item.value("name", "");
|
||||
m.logical_device_seq = monitor_item.value("lineNo", "");
|
||||
m.voltage_level = monitor_item.value("voltageLevel", "");
|
||||
m.LineLogLevel = monitor_item.value("lineLogLevel", "WARN");
|
||||
// status 可能是数字,统一转成字符串存
|
||||
if (monitor_item.contains("status") && monitor_item["status"].is_number_integer())
|
||||
m.status = std::to_string(monitor_item["status"].get<int>());
|
||||
@@ -871,6 +873,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
}
|
||||
}
|
||||
reply_list.push_back(std::move(one));
|
||||
|
||||
refresh_log_level_cache_locked();
|
||||
}
|
||||
else if(code_str == "ledger_modify"){
|
||||
|
||||
@@ -888,7 +892,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::NOT_FOUND),
|
||||
"终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置");*/
|
||||
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_LEDGER_UPDATE,"无法修改台账,未找到指定装置: %s ,改为添加这个装置的台账", json_data.terminal_name.c_str());
|
||||
DIY_WARNLOG_CODE("process",0,LOG_CODE_LEDGER_UPDATE,"未找到需要修改的装置: %s ,添加这个装置的台账", json_data.terminal_name.c_str());
|
||||
|
||||
init_loggers_bydevid(json_data.terminal_id);
|
||||
terminal_devlist.push_back(json_data);
|
||||
@@ -929,6 +933,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
|
||||
}
|
||||
reply_list.push_back(std::move(one));
|
||||
|
||||
refresh_log_level_cache_locked();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -984,6 +990,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
}
|
||||
}
|
||||
reply_list.push_back(std::move(one));
|
||||
|
||||
refresh_log_level_cache_locked();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1026,7 +1034,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
|
||||
}
|
||||
|
||||
// 日志记录
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_RT_DATA,"收到主题:%s - tag:%s的实时触发消息",G_MQCONSUMER_TOPIC_RT.c_str(),FRONT_INST.c_str());
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_RT_DATA,"收到实时数据触发消息,开始处理");
|
||||
|
||||
std::cout << "rtdata Callback received message: " << body << std::endl;
|
||||
if (!key.empty()) {
|
||||
@@ -1047,7 +1055,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
|
||||
|
||||
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit,idx)) {
|
||||
std::cerr << "Failed to parse the JSON message." << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "主题:%s - tag:%s的实时触发消息失败", G_MQCONSUMER_TOPIC_RT.c_str(), FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "无法解析实时数据触发消息,实时数据触发失败");
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
@@ -1080,7 +1088,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
|
||||
if (ClientManager::instance().get_dev_status(devid) != 1) {
|
||||
std::cout << "devid对应装置不在线: " << devid << std::endl;
|
||||
// 记录日志不响应 web 端
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_COMM,"主题:%s - tag:%s的实时数据触发消息失败,装置:%s 不在线", G_MQCONSUMER_TOPIC_RT.c_str(),FRONT_INST.c_str(),showName);
|
||||
DIY_WARNLOG_CODE(devid,1,LOG_CODE_COMM,"装置不在线,实时数据触发失败");
|
||||
return rocketmq::CONSUME_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -1091,7 +1099,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
|
||||
}
|
||||
else{
|
||||
std::cerr << "rtdata is NULL." << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的补招触发消息失败",G_MQCONSUMER_TOPIC_RT.c_str(),FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "无法解析实时数据触发消息,实时数据触发失败");
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
@@ -1137,7 +1145,7 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms
|
||||
// 调用业务逻辑处理函数
|
||||
std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate";
|
||||
if (!parseJsonMessageUD(body, updatefilepath)) {
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的台账更新消息失败",G_MQCONSUMER_TOPIC_UD.c_str(),FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"无法解析台账更新指令,台账更新失败");
|
||||
}
|
||||
|
||||
return rocketmq::CONSUME_SUCCESS;
|
||||
@@ -1181,7 +1189,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg)
|
||||
|
||||
// 调用业务处理逻辑
|
||||
if (!parseJsonMessageSET(body)) {
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的进程控制消息失败", G_MQCONSUMER_TOPIC_SET.c_str(), FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"无法解析进程控制指令,进程控制失败");
|
||||
}
|
||||
|
||||
return rocketmq::CONSUME_SUCCESS;
|
||||
@@ -1225,7 +1233,7 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg)
|
||||
|
||||
// 执行日志上送处理
|
||||
if (!parseJsonMessageLOG(body)) {
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的日志上送消息失败",G_MQCONSUMER_TOPIC_LOG.c_str(),FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "无法解析日志控制指令,日志控制失败");
|
||||
}
|
||||
|
||||
return rocketmq::CONSUME_SUCCESS;
|
||||
@@ -1273,8 +1281,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms
|
||||
std::cout << "Message Key: N/A" << std::endl;
|
||||
}
|
||||
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_RECALL,"收到主题:%s - tag:%s的补招消息",
|
||||
G_MQCONSUMER_TOPIC_RC.c_str(), FRONT_INST.c_str());
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_RECALL,"收到补招消息,开始处理");
|
||||
|
||||
// 解析 JSON 字符串
|
||||
recall_json_handle_from_mq(body);//不再使用文件补招方式
|
||||
@@ -2424,7 +2431,7 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg)
|
||||
}
|
||||
|
||||
// 日志记录
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_CLOUD, "收到主题:%s - tag:%s的云前置控制消息",G_MQCONSUMER_TOPIC_CLOUD.c_str(),FRONT_INST.c_str());
|
||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_CLOUD, "收到云前置指令,开始处理");
|
||||
|
||||
std::cout << "cloud Callback received message: " << body << std::endl;
|
||||
if (!key.empty()) {
|
||||
@@ -2442,7 +2449,7 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg)
|
||||
|
||||
if (!parseJsonMessageCLOUD(body, devid, guid, DetailObj,FrontId,Node)) {
|
||||
std::cerr << "Failed to parse the JSON message." << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "主题:%s - tag:%s的云前置控制消息失败", G_MQCONSUMER_TOPIC_CLOUD.c_str(), FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "无法解析云前置指令,云前置指令执行失败");
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
@@ -2461,7 +2468,7 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg)
|
||||
|
||||
if(!parsemsg(devid,guid,DetailObj)){
|
||||
std::cerr << "clouddata is error." << std::endl;
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON,"主题:%s - tag:%s的云前置控制消息失败",G_MQCONSUMER_TOPIC_CLOUD.c_str(),FRONT_INST.c_str());
|
||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_JSON, "无法解析云前置指令,云前置指令执行失败");
|
||||
}
|
||||
|
||||
return rocketmq::CONSUME_SUCCESS;
|
||||
|
||||
@@ -301,7 +301,8 @@ extern bool normalOutputEnabled;
|
||||
"G_TEST_TYPE=<num> - Set the G_TEST_TYPE 0:use ledger,1:use number\r\n"
|
||||
"TESTLEDGER <processNo>,<start>,<count> - Batch send UD ledger updates (e.g. TESTLEDGER 3,1,50)\r\n"
|
||||
"LOG=<bool> - Set the LOG\r\n"
|
||||
"MAX=<int> - Set the MAX_ITEMS\r\n"
|
||||
"LOGLIST - List all registered loggers\r\n"
|
||||
"MAX=<int> - Set the MAX_ITEMS\r\n"
|
||||
"dir - Execute rocketmq_test_getdir\r\n"
|
||||
"rc - Execute rocketmq_test_rc\r\n"
|
||||
"rt - Execute rocketmq_test_rt\r\n"
|
||||
@@ -368,6 +369,40 @@ extern bool normalOutputEnabled;
|
||||
int flag = std::atoi(cmd.substr(4).c_str());
|
||||
setTestlog(flag);
|
||||
sendStr(clientFD, "\r\x1B[KLOG updated\r\n");
|
||||
}else if (cmd == "LOGLIST" || cmd == "loglist") {
|
||||
std::ostringstream oss;
|
||||
|
||||
// 1️⃣ 打印 logger_map
|
||||
oss << "\r\x1B[KRegistered loggers (" << logger_map.size() << "):\r\n";
|
||||
for (const auto& it : logger_map) {
|
||||
oss << " " << it.first << "\r\n";
|
||||
}
|
||||
|
||||
// 2️⃣ 打印 LogLevelCache(原子只读)
|
||||
std::shared_ptr<LogLevelCache> cache =
|
||||
std::atomic_load_explicit(&g_level_cache_sp, std::memory_order_acquire);
|
||||
|
||||
if (!cache) {
|
||||
oss << "\r\x1B[K[LogLevelCache] <EMPTY>\r\n";
|
||||
} else {
|
||||
oss << "\r\x1B[K[LogLevelCache] terminal levels ("
|
||||
<< cache->term_min.size() << "):\r\n";
|
||||
|
||||
for (const auto& kv : cache->term_min) {
|
||||
oss << " terminal." << kv.first
|
||||
<< " -> " << loglevel_to_str(kv.second) << "\r\n";
|
||||
}
|
||||
|
||||
oss << "\r\x1B[K[LogLevelCache] monitor levels ("
|
||||
<< cache->mp_min.size() << "):\r\n";
|
||||
|
||||
for (const auto& kv : cache->mp_min) {
|
||||
oss << " monitor." << kv.first
|
||||
<< " -> " << loglevel_to_str(kv.second) << "\r\n";
|
||||
}
|
||||
}
|
||||
|
||||
sendStr(clientFD, oss.str());
|
||||
}else if (cmd.find("MAX=") == 0) {
|
||||
int flag = std::atoi(cmd.substr(4).c_str());
|
||||
setMaxItems(flag);
|
||||
@@ -488,6 +523,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
||||
os << "\r\x1B[K|-- timestamp : " << dev.timestamp << "\n";
|
||||
os << "\r\x1B[K|-- Righttime : " << dev.Righttime << "\n";
|
||||
os << "\r\x1B[K|-- mac : " << dev.mac << "\n";
|
||||
os << "\r\x1B[K|-- loglevel : " << dev.DevLogLevel << "\n";
|
||||
|
||||
// ========================= 终端级 · 内部定值 =========================
|
||||
// internal_values(ushort 列表)与 dz_internal_info_list 一一对应,仅展示前 MAX_ITEMS 条
|
||||
@@ -553,6 +589,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
|
||||
os << "\r\x1B[K |-- terminal_connect : " << ld.terminal_connect << "\n";
|
||||
os << "\r\x1B[K |-- status : " << ld.status << "\n";
|
||||
os << "\r\x1B[K |-- timestamp : " << ld.timestamp << "\n";
|
||||
os << "\r\x1B[K |-- loglevel : " << ld.LineLogLevel << "\n";
|
||||
os << "\r\x1B[K |-- CT1=" << ld.CT1 << ", CT2=" << ld.CT2
|
||||
<< ", PT1=" << ld.PT1 << ", PT2=" << ld.PT2 << "\n";
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user