Merge branch '测试2' of http://192.168.1.22:3000/zw/Linux_Front1056 into 测试2

This commit is contained in:
zw
2025-07-03 11:13:33 +08:00
13 changed files with 490 additions and 186 deletions

75
.vscode/settings.json vendored
View File

@@ -55,5 +55,78 @@
"C_Cpp_Runner.useLeakSanitizer": false,
"C_Cpp_Runner.showCompilationTime": false,
"C_Cpp_Runner.useLinkTimeOptimization": false,
"C_Cpp_Runner.msvcSecureNoWarnings": false
"C_Cpp_Runner.msvcSecureNoWarnings": false,
"files.associations": {
"any": "cpp",
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"cctype": "cpp",
"charconv": "cpp",
"chrono": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"codecvt": "cpp",
"compare": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"forward_list": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"string": "cpp",
"unordered_map": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"ratio": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"utility": "cpp",
"format": "cpp",
"fstream": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"new": "cpp",
"numbers": "cpp",
"ostream": "cpp",
"ranges": "cpp",
"semaphore": "cpp",
"span": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"text_encoding": "cpp",
"thread": "cpp",
"cinttypes": "cpp",
"typeinfo": "cpp",
"valarray": "cpp",
"variant": "cpp"
}
}

View File

@@ -1,3 +1,6 @@
#ifndef CLIENT_H
#define CLIENT_H
#include <uv.h>
#include <string>
#include <vector>
@@ -220,3 +223,5 @@ void on_connect(uv_connect_t* req, int status);
void on_close(uv_handle_t* handle);
void init_clients(uv_loop_t* loop, const std::vector<DeviceInfo>& devices);
void stop_all_clients();
#endif

View File

@@ -11,7 +11,7 @@
QTDIR=/qt-4.8.4
export QTDIR
FEP_ENV=/FeProject
FEP_ENV=/home/pq/zwproject/LFtid1056
export FEP_ENV
PATH=$FEP_ENV/bin:$QTDIR/bin:$PATH

View File

@@ -31,6 +31,8 @@
#include "tinyxml2.h"
#include "rocketmq.h"
/////////////////////////////////////////////////////////////////////////////////////////////////
using namespace std;
@@ -58,8 +60,6 @@ extern std::list<queue_data_t> queue_data_list; //queue发送数据链表
extern int three_secs_enabled;
extern std::vector<terminal_dev> terminal_devlist;
extern std::map<std::string, Xmldata*> xmlinfo_list;//保存所有型号对应的icd映射文件解析数据
extern XmlConfig xmlcfg;//星形接线xml节点解析的数据-默认映射文件解析数据
extern std::list<CTopic *> topicList; //队列发送主题链表
@@ -527,7 +527,7 @@ void init_config() {
}
//测试进程端口
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
/*if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
TEST_PORT = TEST_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
TEST_PORT = TEST_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
@@ -537,8 +537,8 @@ void init_config() {
}
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
TEST_PORT = TEST_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
}*/
TEST_PORT = TEST_PORT + g_front_seg_index;
}
////////////////////////////////////////////////////////////////////////////////////////////获取当前时间
@@ -817,7 +817,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) {
DIR* dir = opendir(cfg_dir.c_str());
if (!dir) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件补招文件路径FRONT_PATH + /etc/recall/不存在", get_front_msg_from_subdir(), g_front_seg_index);
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index);
return false;
}
@@ -829,7 +829,7 @@ int parse_recall_xml(recall_xml_t* recall_xml, const std::string& id) {
std::string filepath = cfg_dir + "/" + filename;
tinyxml2::XMLDocument doc;
if (doc.LoadFile(filepath.c_str()) != tinyxml2::XML_SUCCESS) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法解析补招文件%s,补招内容无效", get_front_msg_from_subdir(), g_front_seg_index, filepath.c_str());
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法解析补招文件%s,补招内容无效", g_front_seg_index, filepath.c_str());
continue;
}
@@ -871,20 +871,18 @@ void process_recall_config(recall_xml_t* recall_xml)
//根据监测点id来获取补招数据补招时调用这个
void Check_Recall_Config(const std::string& id) {
if (g_node_id == HIS_DATA_BASE_NODE_ID ||
/*if (g_node_id == HIS_DATA_BASE_NODE_ID ||
g_node_id == NEW_HIS_DATA_BASE_NODE_ID ||
g_node_id == RECALL_HIS_DATA_BASE_NODE_ID ||
g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {
g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {*/
recall_xml_t recall_xml;
std::memset(&recall_xml, 0, sizeof(recall_xml_t));
// 解析补招文件
parse_recall_xml(&recall_xml, id);
// 将补招数据赋值到全局变量
process_recall_config(&recall_xml);
}
recall_xml_t recall_xml;
std::memset(&recall_xml, 0, sizeof(recall_xml_t));
// 解析补招文件
parse_recall_xml(&recall_xml, id);
// 将补招数据赋值到全局变量
process_recall_config(&recall_xml);
//}
}
//补招成功后删除补招文件,补招后调用这个
@@ -925,7 +923,7 @@ void DeletcRecallXml() {
DIR* dir = opendir(cfg_dir.c_str());
if (!dir) {
std::cerr << "folder does not exist!" << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在",get_front_msg_from_subdir(), g_front_seg_index);
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 删除旧的补招文件失败,补招文件路径FRONT_PATH + /etc/recall/不存在", g_front_seg_index);
return;
}
@@ -944,7 +942,7 @@ void DeletcRecallXml() {
if (stat(fullpath.c_str(), &file_stat) == 0) {
if (file_stat.st_mtime < cutoff) {
if (remove(fullpath.c_str()) == 0) {
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 删除超过两天的补招文件",get_front_msg_from_subdir(), g_front_seg_index);
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 删除超过两天的补招文件", g_front_seg_index);
} else {
std::cerr << "Failed to remove file: " << fullpath << std::endl;
}
@@ -966,7 +964,7 @@ void CreateRecallXml() {
g_StatisticLackList_list_mutex.lock();
if (!g_StatisticLackList.empty()) {
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程 开始写入补招文件", get_front_msg_from_subdir(), g_front_seg_index);
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程 开始写入补招文件", g_front_seg_index);
std::map<std::string, std::list<JournalRecall>> id_map;
for (const auto& jr : g_StatisticLackList) {
@@ -1006,7 +1004,7 @@ void CreateRecallXml() {
tinyxml2::XMLError save_result = doc.SaveFile(path.str().c_str());
if (save_result != tinyxml2::XML_SUCCESS) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 无法将补招文件写入路径: %s",get_front_msg_from_subdir(), g_front_seg_index, path.str().c_str());
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 无法将补招文件写入路径: %s", g_front_seg_index, path.str().c_str());
continue;
}
}
@@ -1019,10 +1017,10 @@ void CreateRecallXml() {
//生成待补招xml文件
void create_recall_xml()
{
if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) {
DeletcRecallXml();
CreateRecallXml();
}
//if (g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_HIS_DATA_BASE_NODE_ID || (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)) {
DeletcRecallXml();
CreateRecallXml();
//}
}
// 工具函数:将时间字符串转为 time_t秒级
@@ -1244,7 +1242,7 @@ void printTerminalDevMap(const std::map<std::string, terminal_dev>& terminal_dev
std::cout << "Key: " << key
<< ", Terminal ID: " << dev.terminal_id
<< ", Terminal Code: " << dev.terminal_code
<< ", Terminal Code: " << dev.terminal_name
<< ", Organization Name: "<< dev.org_name
<< ", Maintenance Name: " << dev.maint_name
<< ", Station Name: " << dev.station_name
@@ -1258,6 +1256,9 @@ void printTerminalDevMap(const std::map<std::string, terminal_dev>& terminal_dev
<< ", Address: " << dev.addr_str
<< ", Port: " << dev.port
<< ", Timestamp: " << dev.timestamp
<< ", mac: " << dev.mac
<< std::endl;
// 打印监测点信息
@@ -1265,13 +1266,19 @@ void printTerminalDevMap(const std::map<std::string, terminal_dev>& terminal_dev
const auto& m = dev.line[i];
std::cout << " Monitor [" << i << "] "
<< "ID: " << m.monitor_id
<< ", Code: " << m.terminal_code
<< ", Code: " << m.terminal_id
<< ", Name: " << m.monitor_name
<< ", Seq: " << m.logical_device_seq
<< ", Voltage: "<< m.voltage_level
<< ", Connect: "<< m.terminal_connect
<< ", Timestamp:"<< m.timestamp
<< ", Status: " << m.status
<< ", CT1: " << m.CT1
<< ", CT2: " << m.CT2
<< ", PT1: " << m.PT1
<< ", PT2: " << m.PT2
<< std::endl;
}
}
@@ -1357,7 +1364,7 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml,
};
work_terminal.terminal_id = get_value("id");
work_terminal.terminal_code = get_value("terminalCode");
work_terminal.terminal_name = get_value("terminalCode");
work_terminal.org_name = get_value("orgName");
work_terminal.maint_name = get_value("maintName");
work_terminal.station_name = get_value("stationName");
@@ -1371,6 +1378,8 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml,
work_terminal.port = get_value("port");
work_terminal.timestamp = get_value("updateTime");
work_terminal.mac = get_value("mac");
for (tinyxml2::XMLElement* monitor = root->FirstChildElement("monitorData");
monitor;
monitor = monitor->NextSiblingElement("monitorData")) {
@@ -1381,9 +1390,18 @@ void parse_terminal_from_data(trigger_update_xml_t& trigger_update_xml,
mon.terminal_connect = monitor->FirstChildElement("ptType") ? monitor->FirstChildElement("ptType")->GetText() : "N/A";
mon.logical_device_seq = monitor->FirstChildElement("lineNo") ? monitor->FirstChildElement("lineNo")->GetText() : "N/A";
mon.timestamp = monitor->FirstChildElement("timestamp") ? monitor->FirstChildElement("timestamp")->GetText() : "N/A";
mon.terminal_code = monitor->FirstChildElement("terminal_code") ? monitor->FirstChildElement("terminal_code")->GetText() : "N/A";
mon.terminal_id = monitor->FirstChildElement("terminal_id") ? monitor->FirstChildElement("terminal_name")->GetText() : "N/A";
mon.status = monitor->FirstChildElement("status") ? monitor->FirstChildElement("status")->GetText() : "N/A";
mon.CT1 = monitor->FirstChildElement("CT1") && monitor->FirstChildElement("CT1")->GetText()
? atof(monitor->FirstChildElement("CT1")->GetText()) : 0.0;
mon.CT2 = monitor->FirstChildElement("CT2") && monitor->FirstChildElement("CT2")->GetText()
? atof(monitor->FirstChildElement("CT2")->GetText()) : 0.0;
mon.PT1 = monitor->FirstChildElement("PT1") && monitor->FirstChildElement("PT1")->GetText()
? atof(monitor->FirstChildElement("PT1")->GetText()) : 0.0;
mon.PT2 = monitor->FirstChildElement("PT2") && monitor->FirstChildElement("PT2")->GetText()
? atof(monitor->FirstChildElement("PT2")->GetText()) : 0.0;
work_terminal.line.push_back(mon);
}
@@ -1536,9 +1554,9 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
target_dev.terminal_id = update.terminal_id;
std::cout << "terminal_id: " << target_dev.terminal_id << std::endl;
}
if (!update.terminal_code.empty()) {
target_dev.terminal_code = update.terminal_code;
std::cout << "terminal_code: " << target_dev.terminal_code << std::endl;
if (!update.terminal_name.empty()) {
target_dev.terminal_name = update.terminal_name;
std::cout << "terminal_name: " << target_dev.terminal_name << std::endl;
}
if (!update.tmnl_factory.empty()) {
target_dev.tmnl_factory = update.tmnl_factory;
@@ -1574,6 +1592,11 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
std::cout << "port: " << target_dev.port << std::endl;
}
if (!update.mac.empty()) {
target_dev.mac = update.mac;
std::cout << "mac: " << target_dev.mac << std::endl;
}
if (!update.timestamp.empty()) {
struct tm timeinfo = {};
if (sscanf(update.timestamp.c_str(), "%4d-%2d-%2d %2d:%2d:%2d",
@@ -1609,9 +1632,14 @@ int update_one_terminal_ledger(const terminal_dev& update,terminal_dev& target_d
m.voltage_level = mon.voltage_level;
m.terminal_connect = mon.terminal_connect;
m.status = mon.status;
m.terminal_code = mon.terminal_code;
m.terminal_id = mon.terminal_id;
m.timestamp = mon.timestamp;
m.CT1 = mon.CT1;
m.CT2 = mon.CT2;
m.PT1 = mon.PT1;
m.PT2 = mon.PT2;
if (m.terminal_connect != "0") {
isdelta_flag = 1;
std::cout << "monitor_id " << m.monitor_id << " uses delta wiring." << std::endl;
@@ -2104,13 +2132,18 @@ void print_monitor(const ledger_monitor& mon) {
auto safe = [](const std::string& s) { return s.empty() ? "N/A" : s; };
std::cout << "Monitor ID: " << safe(mon.monitor_id) << "\n";
std::cout << "Terminal Code: " << safe(mon.terminal_code) << "\n";
std::cout << "Terminal ID: " << safe(mon.terminal_id) << "\n";
std::cout << "Monitor Name: " << safe(mon.monitor_name) << "\n";
std::cout << "Logical Device Sequence: " << safe(mon.logical_device_seq) << "\n";
std::cout << "Voltage Level: " << safe(mon.voltage_level) << "\n";
std::cout << "Terminal Connect: " << safe(mon.terminal_connect) << "\n";
std::cout << "Timestamp: " << safe(mon.timestamp) << "\n";
std::cout << "Status: " << safe(mon.status) << "\n";
std::cout << "CT1: " << mon.CT1 << "\n";
std::cout << "CT2: " << mon.CT2 << "\n";
std::cout << "PT1: " << mon.PT1 << "\n";
std::cout << "PT2: " << mon.PT2 << "\n";
}
void print_terminal(const terminal_dev& tmnl) {
@@ -2118,7 +2151,7 @@ void print_terminal(const terminal_dev& tmnl) {
std::cout << "GUID: " << safe(tmnl.guid) << "\n";
std::cout << "Terminal ID: " << safe(tmnl.terminal_id) << "\n";
std::cout << "Terminal Code: " << safe(tmnl.terminal_code)<< "\n";
std::cout << "Terminal Code: " << safe(tmnl.terminal_name)<< "\n";
std::cout << "Organization Name: "<< safe(tmnl.org_name) << "\n";
std::cout << "Maintenance Name: " << safe(tmnl.maint_name) << "\n";
std::cout << "Station Name: " << safe(tmnl.station_name) << "\n";
@@ -2131,6 +2164,8 @@ void print_terminal(const terminal_dev& tmnl) {
std::cout << "Port: " << safe(tmnl.port) << "\n";
std::cout << "Timestamp: " << safe(tmnl.timestamp) << "\n";
std::cout << "mac: " << safe(tmnl.mac) << "\n";
for (size_t i = 0; i < 10 && !tmnl.line[i].monitor_id.empty(); ++i) {
std::cout << " Monitor " << (i + 1) << ":\n";
print_monitor(tmnl.line[i]);
@@ -2168,7 +2203,7 @@ void print_trigger_update_xml(const trigger_update_xml_t& trigger_update) {
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////解析映射文件
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////解析模板文件
//解析映射文件
bool ParseXMLConfig2(int xml_flag, XmlConfig *cfg, std::list<CTopic*> *ctopiclist, const std::string& path)
@@ -2615,3 +2650,114 @@ void Set_xml_nodeinfo()
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////数据转换函数
// DataArrayItem to_json
void to_json(nlohmann::json& j, const DataArrayItem& d) {
j = nlohmann::json{
{"DataAttr", d.DataAttr},
{"DataTimeSec", d.DataTimeSec},
{"DataTimeUSec", d.DataTimeUSec},
{"DataTag", d.DataTag},
{"Data", d.Data}
};
}
// MsgObj to_json
void to_json(nlohmann::json& j, const MsgObj& m) {
j = nlohmann::json{
{"Cldid", m.Cldid},
{"DataType", m.DataType},
{"DataAttr", m.DataAttr},
{"DsNameIdx", m.DsNameIdx},
{"DataArray", m.DataArray}
};
}
// FullObj to_json
void to_json(nlohmann::json& j, const FullObj& f) {
j = nlohmann::json{
{"Mid", f.Mid},
{"Did", f.Did},
{"Pri", f.Pri},
{"Type", f.Type},
{"Msg", f.Msg}
};
}
std::string generate_json(
int Mid, //需应答的报文订阅者收到后需以此ID应答无需应答填入“-1”
int Did, //设备唯一标识Ldid填入0代表Ndid。
int Pri, //报文处理的优先级
int Type, //消息类型
int Cldid, //逻辑子设备ID0-逻辑设备本身,无填-1
int DataType, //数据类型0-表示以数据集方式上送
int DataAttr, //数据属性无“0”、实时“1”、统计“2”等。
int DsNameIdx, //数据集序号(以数据集方式上送),无填-1
const std::vector<DataArrayItem>& dataArray //数据数组。
) {
FullObj fobj;
fobj.Mid = Mid;
fobj.Did = Did;
fobj.Pri = Pri;
fobj.Type = Type;
fobj.Msg.Cldid = Cldid;
fobj.Msg.DataType = DataType;
fobj.Msg.DataAttr = DataAttr;
fobj.Msg.DsNameIdx = DsNameIdx;
fobj.Msg.DataArray = dataArray;
nlohmann::json j = fobj;
return j.dump(); // 输出标准 json 字符串
}
void upload_data_test(){
std::vector<DataArrayItem> arr;
arr.push_back({1, 1725477660, 0, 1, "xxxx"}); //数据属性 -1-无, 0-“Rt”,1-“Max”,2-“Min”,3-“Avg”,4-“Cp95”
//数据时标相对1970年的秒无效填入“-1”
//数据时标,微秒钟,无效填入“-1”
//数据标识1-标识数据异常
//数据序列数据集上送时将二进制数据流转换成Base64字符串其他数据为object
arr.push_back({2, 1691741340, 0, 1, "yyyy"});
std::string js = generate_json(
-1, 2, 1, 4866, 1, 0, 2, 1, arr
);
std::cout << js << std::endl;
queue_data_t data;
data.monitor_no = 1;
data.strTopic = TOPIC_ALARM;
data.strText = js;
data.mp_id = "test";
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
//////////////////////////////////////////////////////////////////////////////////////////////////台账赋值给通信
std::vector<DeviceInfo> GenerateDeviceInfoFromLedger(const std::vector<terminal_dev>& terminal_devlist) {
std::vector<DeviceInfo> devices;
for (const auto& terminal : terminal_devlist) {
DeviceInfo device;
device.device_id = terminal.terminal_id;
device.name = terminal.terminal_name;
device.model = terminal.dev_series;
device.mac = terminal.mac;
for (const auto& monitor : terminal.line) {
PointInfo point;
point.point_id = monitor.monitor_id;
point.name = monitor.monitor_name;
point.device_id = terminal.terminal_id;
point.PT1 = monitor.PT1;
point.PT2 = monitor.PT2;
point.CT1 = monitor.CT1;
point.CT2 = monitor.CT2;
device.points.push_back(point);
}
devices.push_back(device);
}
return devices;
}

View File

@@ -133,7 +133,6 @@ void SendJsonAPI_web(const std::string& strUrl, //接口路径
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////上传文件接口
//处理文件上传响应
@@ -535,7 +534,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
{
if (inputstring.empty()) {
std::cerr << "Error: inputstring is empty\n";
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程调用web台账接口的入参为空", get_front_msg_from_subdir(), g_front_seg_index);
DIY_ERRORLOG("process","【ERROR】前置的%d号进程调用web台账接口的入参为空", g_front_seg_index);
return 1;
}
@@ -609,7 +608,7 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
};
dev.terminal_id = safe_str(item, "id");
dev.addr_str = safe_str(item, "ip");
dev.terminal_code = safe_str(item, "name");
dev.terminal_name = safe_str(item, "name");
dev.org_name = safe_str(item, "org_name");
dev.maint_name = safe_str(item, "maint_name");
dev.station_name = safe_str(item, "stationName");
@@ -623,18 +622,26 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
dev.processNo = safe_str(item, "processNo");
dev.maxProcessNum = safe_str(item, "maxProcessNum");
dev.mac = safe_str(item, "mac");//添加mac
if (item.contains("monitorData") && item["monitorData"].is_array()) {
for (auto& mon : item["monitorData"]) {
if (dev.line.size() >= 10) break;
ledger_monitor m;
m.monitor_id = safe_str(mon, "id");
m.terminal_code = safe_str(mon, "terminal_code");
m.terminal_id = safe_str(mon, "terminal_id");
m.monitor_name = safe_str(mon, "name");
m.logical_device_seq = safe_str(mon, "lineNo");
m.voltage_level = safe_str(mon, "voltageLevel");
m.terminal_connect = safe_str(mon, "ptType");
m.timestamp = safe_str(mon, "updateTime");
m.status = safe_str(mon, "status");
m.CT1 = mon.value("CT1", 0.0);
m.CT2 = mon.value("CT2", 0.0);
m.PT1 = mon.value("PT1", 0.0);
m.PT2 = mon.value("PT2", 0.0);
dev.line.push_back(m);
}
}
@@ -665,7 +672,8 @@ int terminal_ledger_web(std::map<std::string, terminal_dev>& terminal_dev_map,
}
// 5. 主进程保存台账
if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
//if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
save_ledger_json(responseStr);
}
@@ -684,7 +692,7 @@ int parse_device_cfg_web()
input_jstr += "}";
std::cout << "input_jstr: " << input_jstr << std::endl;
DIY_DEBUGLOG("process","【DEBUG】前置的%s%d号进程调用web接口获取台账使用的请求输入为:%s",get_front_msg_from_subdir(), g_front_seg_index, input_jstr.c_str());
DIY_DEBUGLOG("process","【DEBUG】前置的%d号进程调用web接口获取台账使用的请求输入为:%s", g_front_seg_index, input_jstr.c_str());
// 2. 调用接口
std::map<std::string, terminal_dev> terminal_dev_map;
@@ -695,8 +703,9 @@ int parse_device_cfg_web()
// 3. 调试打印
printTerminalDevMap(terminal_dev_map);
// 4. 看门狗配置校验(仅主进程稳态
if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
// 4. 看门狗配置校验(仅主进程)
//if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
int max_index = get_max_stat_data_index(FRONT_PATH + "/etc/runtime.cf");
std::cout << "max_index = " << max_index << std::endl;
@@ -723,13 +732,13 @@ int parse_device_cfg_web()
// 5. 台账数量与配置比对
int count_cfg = static_cast<int>(terminal_dev_map.size());
std::cout << "terminal_ledger_num: " << count_cfg << std::endl;
DIY_DEBUGLOG("process", "【DEBUG】前置的%s%d号进程调用获取到的台账的数量为:%d",get_front_msg_from_subdir(), g_front_seg_index, count_cfg);
DIY_DEBUGLOG("process", "【DEBUG】前置的%d号进程调用获取到的台账的数量为:%d", g_front_seg_index, count_cfg);
if (IED_COUNT < count_cfg) {
std::cout << "!!!!!!!!!!single process can not add any ledger unless reboot!!!!!!!" << std::endl;
DIY_WARNLOG("process","【WARN】前置的%s%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT);
DIY_WARNLOG("process","【WARN】前置的%d号进程获取到的台账的数量大于配置文件中给单个进程配置的台账数量:%d,这个进程将按照获取到的台账的数量来创建台账空间,这个进程不能直接通过台账添加来新增台账,只能通过重启进程或者先删除已有台账再添加台账的方式来添加新台账", g_front_seg_index, IED_COUNT);
} else {
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间",get_front_msg_from_subdir(), g_front_seg_index, IED_COUNT);
DIY_INFOLOG("process","【NORMAL】前置的%d号进程根据配置文件中给单个进程配置的台账数量:%d来创建台账空间", g_front_seg_index, IED_COUNT);
}
///////////////////////////////////////////////////////////////////////////////用例这里将局部的map拷贝到全局map后续根据协议台账修改
@@ -876,7 +885,7 @@ int parse_model_cfg_web()
// 3. 调用接口
std::map<std::string, icd_model*> icd_model_map;
if (parse_model_web(&icd_model_map, input_jstr)) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index);
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index);
// 确保释放 map
for (auto& kv : icd_model_map) delete kv.second;
return 0;
@@ -930,7 +939,7 @@ std::string parse_model_cfg_web_one(const std::string& terminal_type)
// 2. 拉取并解析
if (parse_model_web(&icd_model_map, input_jstr) != 0) {
std::cerr << "parse_model_web failed for type: " << terminal_type << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置",get_front_msg_from_subdir(), g_front_seg_index);
DIY_ERRORLOG("process","【ERROR】前置的%d号进程 icd模型接口异常,将使用默认的icd模型,请检查接口配置", g_front_seg_index);
// 清理(即使 map 为空,也安全)
for (auto& kv : icd_model_map) delete kv.second;
return "";

View File

@@ -6,6 +6,13 @@
#include <list>
#include <array>
#include <map>
#include <mutex>
///////////////////////////////////////////////////////////////////////////////////////////
#include "nlohmann/json.hpp"
#include "../../client2.h"
///////////////////////////////////////////////////////////////////////////////////////////
@@ -13,13 +20,13 @@ class Front;
///////////////////////////////////////////////////////////////////////////////////////////
#define STAT_DATA_BASE_NODE_ID 100
/*#define STAT_DATA_BASE_NODE_ID 100
#define THREE_SECS_DATA_BASE_NODE_ID 200
#define SOE_COMTRADE_BASE_NODE_ID 300
#define HIS_DATA_BASE_NODE_ID 400
#define NEW_HIS_DATA_BASE_NODE_ID 500
#define RECALL_HIS_DATA_BASE_NODE_ID 600
#define RECALL_ALL_DATA_BASE_NODE_ID 700
#define RECALL_ALL_DATA_BASE_NODE_ID 700*/
///////////////////////////////////////////////////////////////////////////////////////////
@@ -47,23 +54,28 @@ class ledger_monitor
{
public:
std::string monitor_id; //监测点id
std::string terminal_code; //监测点
std::string terminal_id; //监测点
std::string monitor_name; //监测点名
std::string logical_device_seq; //监测点序号
std::string voltage_level; //监测点电压等级
std::string terminal_connect; //监测点接线方式
std::string timestamp; //更新时间
std::string status; //监测点状态
double PT1; // 电压变比1
double PT2; // 电压变比2
double CT1; // 电流变比1
double CT2; // 电流变比2
};
//终端台账
class terminal_dev
{
public:
std::string guid;
std::string guid; //台账更新回复用
std::string terminal_id;
std::string terminal_code;
std::string terminal_name;
std::string org_name;
std::string maint_name;
std::string station_name;
@@ -78,6 +90,8 @@ public:
std::string processNo;
std::string maxProcessNum;
std::string mac; // 装置MAC地址
std::vector<ledger_monitor> line;
};
@@ -296,6 +310,9 @@ int parse_model_cfg_web();
void qvvr_test();
void Fileupload_test();
extern std::vector<terminal_dev> terminal_devlist;
extern std::mutex ledgermtx;
//////////////////////////////////////////////////////////////////////////////////cfg_parse的函数声明
void init_config();
@@ -314,17 +331,23 @@ bool is_blank(const std::string& str);
void print_terminal(const terminal_dev& tmnl);
void printTerminalDevMap(const std::map<std::string, terminal_dev>& terminal_dev_map);
void upload_data_test();
////////////////////////////////////////////////////////////////////////////////mq
extern std::mutex queue_data_list_mutex;
extern std::list<queue_data_t> queue_data_list;
/////////////////////////////////////////////////////////////////////////////////主函数类声明
std::string get_front_msg_from_subdir();
//std::string get_front_msg_from_subdir();
extern std::string FRONT_PATH;
extern int g_front_seg_index;
extern int g_front_seg_num;
void* cloudfrontthread(void* arg);
bool parse_param(int argc, char* argv[]);
struct ThreadArgs {
int argc;
@@ -353,6 +376,45 @@ typedef struct {
pthread_mutex_t lock; // 线程专用互斥锁
} thread_info_t;
///////////////////////////////////////////////////////////////////////////////////////上送数据的json格式
// 单条 DataArray 数据
struct DataArrayItem {
int DataAttr;
int DataTimeSec;
int DataTimeUSec;
int DataTag;
std::string Data;
};
// Msg 对象
struct MsgObj {
int Cldid;
int DataType;
int DataAttr;
int DsNameIdx;
std::vector<DataArrayItem> DataArray;
};
// 整体
struct FullObj {
int Mid;
int Did;
int Pri;
int Type;
MsgObj Msg;
};
// nlohmann序列化接口
void to_json(nlohmann::json& j, const DataArrayItem& d);
void to_json(nlohmann::json& j, const MsgObj& m);
void to_json(nlohmann::json& j, const FullObj& f);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
std::vector<DeviceInfo> GenerateDeviceInfoFromLedger(const std::vector<terminal_dev>& terminal_devlist);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#endif

View File

@@ -47,17 +47,11 @@ extern int g_front_seg_index;
extern std::string FRONT_INST;
extern std::string subdir;
//mq
extern std::mutex queue_data_list_mutex; //queue发送数据锁
extern std::list<queue_data_t> queue_data_list; //queue发送数据链表
//日志主题
extern std::string G_LOG_TOPIC;
extern std::vector<terminal_dev> terminal_devlist;
////////////////////////////////////////////////////////辅助函数
std::string get_front_type_from_subdir() {
/*std::string get_front_type_from_subdir() {
if (subdir == "cfg_3s_data")
return "realTime";
else if (subdir == "cfg_soe_comtrade")
@@ -68,7 +62,7 @@ std::string get_front_type_from_subdir() {
return "stat";
else
return "unknown";
}
}*/
// 递归创建目录
bool create_directory_recursive(const std::string& path) {
@@ -159,7 +153,7 @@ protected:
<< "\",\"level\":\"" << level_str
<< "\",\"grade\":\"" << get_level_str(level)
<< "\",\"logtype\":\"" << (logtype == LOGTYPE_COM ? "com" : "data")
<< "\",\"frontType\":\"" << get_front_type_from_subdir()
<< "\",\"frontType\":\"" << "cloudfront"
<< "\",\"log\":\"" << escape_json(msg) << "\"}";
std::string jsonString = oss.str();

View File

@@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch;
extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result);
std::string get_front_type_from_subdir();
//std::string get_front_type_from_subdir();
// 不带 Appender 的版本

View File

@@ -54,7 +54,7 @@ extern DebugSwitch g_debug_switch;
extern void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result);
std::string get_front_type_from_subdir();
//std::string get_front_type_from_subdir();
// 不带 Appender 的版本

View File

@@ -52,7 +52,7 @@ std::string FRONT_PATH;
int INITFLAG = 0;
//前置标置
std::string subdir = "cfg_stat_data"; //默认稳态
std::string subdir = "cloudfrontproc"; //子目录
uint32_t g_node_id = 0;
int g_front_seg_index = 0; //默认单进程
int g_front_seg_num = 0; //默认单进程
@@ -78,10 +78,6 @@ extern int TEST_PORT; //测试端口号
extern std::string FRONT_INST;
extern std::mutex queue_data_list_mutex;
extern std::list<queue_data_t> queue_data_list;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 功能函数
template<typename T, typename... Args>
@@ -143,7 +139,7 @@ bool parse_param(int argc, char* argv[]) {
}
//获取前置类型
void init_global_function_enable() {
/*void init_global_function_enable() {
if (subdir == "cfg_stat_data") { // 历史稳态
g_node_id = STAT_DATA_BASE_NODE_ID;
auto_register_report_enabled = 1;
@@ -155,10 +151,10 @@ void init_global_function_enable() {
} else if (subdir == "cfg_recallhis_data") { // 补招
g_node_id = RECALL_HIS_DATA_BASE_NODE_ID;
}
}
}*/
//获取功能名称
std::string get_front_msg_from_subdir() {
/*std::string get_front_msg_from_subdir() {
if (subdir.find("cfg_3s_data") != std::string::npos)
return "实时数据进程";
else if (subdir.find("cfg_soe_comtrade") != std::string::npos)
@@ -169,7 +165,7 @@ std::string get_front_msg_from_subdir() {
return "稳态统计进程";
else
return "unknown";
}
}*/
//获取前置路径
std::string get_parent_directory() {
@@ -203,14 +199,14 @@ std::string get_parent_directory() {
{
//初始化g_node_id
init_global_function_enable();
//init_global_function_enable();
//配置初始化
init_config();
//启动进程日志
init_logger_process();
DIY_WARNLOG("process","【WARN】前置的%s%d号进程 进程级日志初始化完毕", get_front_msg_from_subdir(), g_front_seg_index);
DIY_WARNLOG("process","【WARN】前置的%d号进程 进程级日志初始化完毕", g_front_seg_index);
//读取台账
parse_device_cfg_web();
@@ -218,11 +214,11 @@ std::string get_parent_directory() {
//初始化日志
init_loggers();
//读取模型,下载文件
//读取模型,下载模板文件
parse_model_cfg_web();
//解析文件
Set_xml_nodeinfo();
//解析模板文件
//Set_xml_nodeinfo();
StartFrontThread(); //开启主线程
@@ -388,13 +384,13 @@ void Front::mqconsumerThread()
std::string nameServer = G_MQCONSUMER_IPPORT;
std::vector<rocketmq::Subscription> subscriptions;
if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata);
}
//if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata);
//}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate);
if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall);
}
//if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall);
//}
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset);
subscriptions.emplace_back(FRONT_INST + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog);

View File

@@ -58,10 +58,6 @@ static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者
///////////////////////////////////////////////////////////////////////////////////////////////////////////
//台账
extern std::mutex ledgermtx;
extern std::vector<terminal_dev> terminal_devlist;
//前置进程
extern unsigned int g_node_id;
extern int g_front_seg_index;
@@ -282,7 +278,7 @@ void rocketmq_producer_send(rocketmq::RocketMQProducer* producer,
producer->sendMessage(body, topic, tags, keys);
} catch (const std::exception& e) {
std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 MQ发送失败", get_front_msg_from_subdir(), g_front_seg_index);
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 MQ发送失败", g_front_seg_index);
}
}
@@ -544,7 +540,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl;
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
if (code_str == "set_process") {
if (!messageBody.contains("processNum")) {
@@ -563,13 +559,14 @@ bool parseJsonMessageSET(const std::string& json_str) {
// 校验参数并执行
if ((fun == "reset" || fun == "add") &&
(processNum >= 1 && processNum < 10) &&
(frontType == "stat" || frontType == "recall" || frontType == "all")) {
(frontType == "cloudfront" || frontType == "all")) {
if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
//if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
execute_bash(fun, processNum, frontType);
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str());
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str());
send_reply_to_queue(guid, "1", "收到重置进程指令,重启所有进程!");
std::cout << "this msg should only execute once" << std::endl;
@@ -581,7 +578,7 @@ bool parseJsonMessageSET(const std::string& json_str) {
send_reply_to_queue(guid, "1", "收到删除进程指令,这个进程将会重启 ");
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str());
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str());
std::this_thread::sleep_for(std::chrono::seconds(10));
::_exit(-1039); // 进程退出
@@ -662,15 +659,15 @@ bool parseJsonMessageLOG(const std::string& json_str) {
}
// 判断 frontType 是否匹配
if (frontType != subdir) {
/*if (frontType != subdir) {
std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl;
return true;
}
}*/
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理日志上送消息", get_front_msg_from_subdir(), g_front_seg_index);
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理日志上送消息", g_front_seg_index);
std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl;
std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;
/*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/
// 回复消息
send_reply_to_queue(guid, "1", "收到实时日志指令");
@@ -687,7 +684,7 @@ bool parseJsonMessageLOG(const std::string& json_str) {
process_log_command(id, level, grade, logtype);
} else {
std::cout << "type doesn't match" << std::endl;
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程处理日志上送消息,格式不正确", get_front_msg_from_subdir(), g_front_seg_index);
DIY_WARNLOG("process", "【WARN】前置的%d号进程处理日志上送消息,格式不正确", g_front_seg_index);
}
std::cout << "this msg should only execute once" << std::endl;
@@ -741,8 +738,8 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl;
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息",
get_front_msg_from_subdir(), g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
send_reply_to_queue(guid, "1", "收到台账更新指令");
@@ -754,7 +751,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
terminal_dev json_data;
json_data.terminal_id = item.value("id", "");
json_data.terminal_code = item.value("name", "");
json_data.terminal_name = item.value("name", "");
json_data.org_name = item.value("org_name", "");
json_data.maint_name = item.value("maint_name", "");
json_data.station_name = item.value("stationName", "");
@@ -783,7 +780,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
m.logical_device_seq = monitor_item.value("lineNo", "");
m.terminal_connect = monitor_item.value("ptType", "");
m.timestamp = json_data.timestamp;
m.terminal_code = json_data.terminal_code;
m.terminal_id = json_data.terminal_id;
}
}
@@ -869,7 +866,7 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
}
else{
std::cerr << "rtdata is NULL." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
}
@@ -913,7 +910,7 @@ rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& ms
// 调用业务逻辑处理函数
std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate";
if (!parseJsonMessageUD(body, updatefilepath)) {
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
}
return rocketmq::CONSUME_SUCCESS;
@@ -943,7 +940,7 @@ rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg)
// 调用业务处理逻辑
if (!parseJsonMessageSET(body)) {
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
}
return rocketmq::CONSUME_SUCCESS;
@@ -973,7 +970,7 @@ rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg)
// 执行日志上送处理
if (!parseJsonMessageLOG(body)) {
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str());
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str());
}
return rocketmq::CONSUME_SUCCESS;
@@ -1016,7 +1013,7 @@ rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& ms
} else {
std::cerr << "recall data is NULL." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
}
return rocketmq::CONSUME_SUCCESS;
@@ -1180,7 +1177,7 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json
xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalCode>" << json_data.terminal_code << "</terminalCode>" << std::endl;
xmlStream << "<terminalCode>" << json_data.terminal_name << "</terminalCode>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
@@ -1201,6 +1198,9 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json
add_indent(xmlStream, indentLevel);
xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<mac>" << json_data.mac << "</mac>" << std::endl;
// monitorData 部分
for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) {
const ledger_monitor& monitor = json_data.line[i];
@@ -1228,9 +1228,21 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json
xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminal_code>" << monitor.terminal_code << "</terminal_code>" << std::endl;
xmlStream << "<terminal_id>" << monitor.terminal_id << "</terminal_id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<CT1>" << monitor.CT1 << "</CT1>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<CT2>" << monitor.CT2 << "</CT2>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<PT1>" << monitor.PT1 << "</PT1>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<PT2>" << monitor.PT2 << "</PT2>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << monitor.status << "</status>" << std::endl;
indentLevel--;
@@ -1306,10 +1318,10 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime,
data.strTopic = G_CONNECT_TOPIC;
data.strText = jsonObject.dump(); // 转换为字符串
if (g_node_id == STAT_DATA_BASE_NODE_ID) {
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
//if (g_node_id == STAT_DATA_BASE_NODE_ID) {
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
//}
}
catch (const std::exception& e) {
std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl;
@@ -1326,7 +1338,7 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const
obj["step"] = step;
obj["result"] = result;
obj["processNo"] = g_front_seg_index;
obj["frontType"] = get_front_type_from_subdir();
obj["frontType"] = "cloudfront";
obj["nodeId"] = FRONT_INST;
// 构造 queue 消息
@@ -1349,7 +1361,7 @@ void send_heartbeat_to_queue(const std::string& status) {
try{
nlohmann::json obj;
obj["nodeId"] = FRONT_INST;
obj["frontType"] = get_front_type_from_subdir();
obj["frontType"] = "cloudfront";
obj["processNo"] = g_front_seg_index;
obj["status"] = status;
@@ -1376,9 +1388,8 @@ bool shouldSkipTerminal(const std::string& terminal_id) {
return false;
}
void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) {
if(!INITFLAG){
void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) {
if (!INITFLAG) {
std::cout << "前置未初始化完成\n";
return;
}
@@ -1422,8 +1433,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) {
if (type == 0) {
std::cout << "use ledger send msg" << std::endl;
for (size_t i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < terminal_devlist.size(); ++i) {
//根据台账模式下每个进程都会发送
for (size_t i = 0; total_messages > 0 && i < terminal_devlist.size(); ++i) {
const auto& dev = terminal_devlist[i];
if (shouldSkipTerminal(dev.terminal_id)) {
@@ -1436,85 +1447,71 @@ void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) {
data.mp_id = dev.line[j].monitor_id;
data.monitor_no = static_cast<int>(i + j);
std::string modified_time = std::to_string(current_time_ms);
std::string modified_time = std::to_string(current_time_ms / 1000);
std::string modified_strText = base_strText;
// 替换 Monitor
size_t monitor_pos = modified_strText.find("\"Monitor\"");
if (monitor_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", monitor_pos);
size_t quote_pos = modified_strText.find("\"", colon_pos);
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id);
}
}
// 替换 TIME
size_t time_pos = modified_strText.find("\"TIME\"");
if (time_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", time_pos);
size_t quote_pos = colon_pos;
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
try {
auto j = nlohmann::json::parse(modified_strText);
j["Did"] = i;
if (j.contains("Msg") && j["Msg"].is_object()) {
j["Msg"]["Cldid"] = j;
if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) {
for (auto& item : j["Msg"]["DataArray"]) {
if (item.is_object()) {
item["DataTimeSec"] = std::stoll(modified_time);
}
}
}
}
modified_strText = j.dump();
} catch (...) {
// 保持原始文本
}
data.strText = modified_strText;
//my_rocketmq_send(data,front->m_producer);
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
std::cout << "Sent message " << (i + 1)
<< " with Monitor " << data.monitor_no
<< " and TIME " << modified_time << std::endl;
}
}
} else {
std::cout << "use monitor + number send msg" << std::endl;
for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) {
//根据虚构监测点模式下只有进程1发送
for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 ) && i < total_messages; ++i) {
std::string monitor_id = "testmonitor" + std::to_string(i);
data.mp_id = monitor_id;
data.monitor_no = i;
std::string modified_time = std::to_string(current_time_ms);
std::string modified_time = std::to_string(current_time_ms / 1000);
std::string modified_strText = base_strText;
// 替换 Monitor
size_t monitor_pos = modified_strText.find("\"Monitor\"");
if (monitor_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", monitor_pos);
size_t quote_pos = modified_strText.find("\"", colon_pos);
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id);
}
}
// 替换 TIME
size_t time_pos = modified_strText.find("\"TIME\"");
if (time_pos != std::string::npos) {
size_t colon_pos = modified_strText.find(":", time_pos);
size_t quote_pos = colon_pos;
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
try {
auto j = nlohmann::json::parse(modified_strText);
j["Did"] = 0;
if (j.contains("Msg") && j["Msg"].is_object()) {
j["Msg"]["Cldid"] = data.mp_id;
if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) {
for (auto& item : j["Msg"]["DataArray"]) {
if (item.is_object()) {
item["DataTimeSec"] = std::stoll(modified_time);
}
}
}
}
modified_strText = j.dump();
} catch (...) {
// 保持原始文本
}
data.strText = modified_strText;
//my_rocketmq_send(data,front->m_producer);
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
std::cout << "Sent message " << (i + 1)
<< " with Monitor " << data.monitor_no
<< " and TIME " << modified_time << std::endl;
}
}

View File

@@ -48,11 +48,6 @@ bool showinshellflag =false;
extern std::list<std::string> errorList, warnList, normalList;
extern std::mutex errorListMutex, warnListMutex, normalListMutex;
extern std::vector<terminal_dev> terminal_devlist;
extern std::mutex ledgermtx;
extern std::list<queue_data_t> queue_data_list;
extern int IED_COUNT;
extern int INITFLAG;
extern int g_front_seg_index;
@@ -281,6 +276,7 @@ extern bool normalOutputEnabled;
if (G_TEST_NUM != 0) {
std::cout << "[PeriodicTask] Executing rocketmq_test_300()\n";
rocketmq_test_300(G_TEST_NUM, g_front_seg_index, G_TEST_TYPE,m_front);
//upload_data_test();
}
}
@@ -412,7 +408,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
std::ostringstream os;
os << "\r\x1B[K------------------------------------\n";
os << "\r\x1B[K|-- terminal_id: " << dev.terminal_id << "\n";
os << "\r\x1B[K|-- terminal_code: " << dev.terminal_code << "\n";
os << "\r\x1B[K|-- terminal_name: " << dev.terminal_name << "\n";
os << "\r\x1B[K|-- dev_ip: " << dev.addr_str << "\n";
os << "\r\x1B[K|-- dev_port: " << dev.port << "\n";
os << "\r\x1B[K|-- dev_type: " << dev.dev_type << "\n";
@@ -427,6 +423,8 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
os << "\r\x1B[K|-- tmnl_status: " << dev.tmnl_status << "\n";
os << "\r\x1B[K|-- timestamp: " << dev.timestamp << "\n";
os << "\r\x1B[K|-- mac: " << dev.mac << "\n";
for (size_t i = 0; i < dev.line.size(); ++i) {
const auto& ld = dev.line[i];
if (ld.monitor_id.empty()) continue;
@@ -434,11 +432,17 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) {
os << "\r\x1B[K |-- monitor_id: " << ld.monitor_id << "\n";
os << "\r\x1B[K |-- monitor_name: " << ld.monitor_name << "\n";
os << "\r\x1B[K |-- logical_device_seq: " << ld.logical_device_seq << "\n";
os << "\r\x1B[K |-- terminal_code: " << ld.terminal_code << "\n";
os << "\r\x1B[K |-- terminal_id: " << ld.terminal_id << "\n";
os << "\r\x1B[K |-- voltage_level: " << ld.voltage_level << "\n";
os << "\r\x1B[K |-- terminal_connect: " << ld.terminal_connect << "\n";
os << "\r\x1B[K |-- status: " << ld.status << "\n";
os << "\r\x1B[K |-- timestamp: " << ld.timestamp << "\n";
os << "\r\x1B[K |-- CT1: " << ld.CT1 << "\n";
os << "\r\x1B[K |-- CT2: " << ld.CT2 << "\n";
os << "\r\x1B[K |-- PT1: " << ld.PT1 << "\n";
os << "\r\x1B[K |-- PT2: " << ld.PT2 << "\n";
}
os << "\r\x1B[K------------------------------------\n";

View File

@@ -7,6 +7,7 @@
#include "client2.h"
#include "cloudfront/code/interface.h"
#include <iostream>
using namespace std;
#if 0
@@ -121,6 +122,8 @@ void* client_manager_thread(void* arg) {
// <20><><EFBFBD><EFBFBD>100<30><30><EFBFBD><EFBFBD><EFBFBD><EFBFBD>װ<EFBFBD><D7B0>
std::vector<DeviceInfo> test_devices = generate_test_devices(100);
//std::vector<DeviceInfo> devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk<6E><6B><EFBFBD><EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
start_client_connect(devices);
@@ -209,8 +212,8 @@ void restart_thread(int index) {
}
else if (false) {
// <20>ӿڣ<D3BF>mq
char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" };
ThreadArgs* args = new ThreadArgs{3, argv};
char* argv[] = { (char*)new_index };//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̺Ų<CCBA><C5B2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ThreadArgs* args = new ThreadArgs{1, argv};
if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
@@ -232,7 +235,12 @@ int is_thread_alive(pthread_t tid) {
}
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
int main() {
int main(int argc ,char** argv) {//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӳ<EFBFBD><D3B2><EFBFBD>
if(!parse_param(argc,argv)){
std::cerr << "process param error,exit" << std::endl;
return 1;
}
srand(time(NULL)); // <20><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
// <20><>ʼ<EFBFBD><CABC><EFBFBD>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD>
@@ -261,6 +269,16 @@ int main() {
free(index);
}
}
else if (i == 2){
//<2F>ӿں<D3BF>mq
char* argv[] = { (char*)index };//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̺Ų<CCBA><C5B2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ThreadArgs* args = new ThreadArgs{1, argv};
if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) {
printf("Failed to create message processor thread %d\n", i);
delete args; // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD>ɹ<EFBFBD><C9B9><EFBFBD><EFBFBD>ֶ<EFBFBD><D6B6>ͷ<EFBFBD>
free(index);
}
}
else {
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>գ<EFBFBD>ʵ<EFBFBD><CAB5>Ӧ<EFBFBD><D3A6><EFBFBD>п<EFBFBD><D0BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>