1634 lines
58 KiB
C++
1634 lines
58 KiB
C++
///////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
#include <iostream>
|
||
#include <fstream>
|
||
#include <string>
|
||
#include <list>
|
||
#include <sstream>
|
||
#include <vector>
|
||
#include <deque>
|
||
#include <mutex>
|
||
#include <thread>
|
||
#include <chrono>
|
||
#include <ctime>
|
||
#include <cstdio>
|
||
#include <iomanip>
|
||
#include <sys/stat.h>
|
||
#include <fnmatch.h>
|
||
#include <atomic>
|
||
#include <map>
|
||
#include <vector>
|
||
#include <array>
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
#include "rocketmq/DefaultMQProducer.h"
|
||
#include "rocketmq/DefaultMQPushConsumer.h"
|
||
#include "rocketmq/MQMessageListener.h"
|
||
#include "rocketmq/MQMessageExt.h"
|
||
#include "rocketmq/MQMessageQueue.h"
|
||
#include "rocketmq/MQSelector.h"
|
||
#include "rocketmq/SendResult.h"
|
||
#include "rocketmq/SessionCredentials.h"
|
||
|
||
#include "rocketmq.h"
|
||
#include "nlohmann/json.hpp"
|
||
#include "log4cplus/log4.h"
|
||
#include "interface.h"
|
||
#include "front.h"
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
using namespace std;
|
||
|
||
using nlohmann::json;
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
#ifndef nullptr
|
||
#define nullptr NULL
|
||
#endif
|
||
|
||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
std::mutex queue_data_list_mutex;
|
||
std::list<queue_data_t> queue_data_list;
|
||
|
||
static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者
|
||
|
||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
//台账
|
||
extern std::mutex ledgermtx;
|
||
extern std::vector<terminal_dev> terminal_devlist;
|
||
|
||
//前置进程
|
||
extern unsigned int g_node_id;
|
||
extern int g_front_seg_index;
|
||
extern std::string subdir;
|
||
extern std::string FRONT_INST;
|
||
|
||
//生产者线程控制
|
||
extern uint32_t g_mqproducer_blocked_times;
|
||
|
||
//初始化标志
|
||
extern int INITFLAG;
|
||
|
||
//测试用的终端数组
|
||
extern std::vector<std::string> TESTARRAY;
|
||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////////外部文件函数声明
|
||
|
||
extern void execute_bash(std::string fun,int process_num,std::string type);
|
||
extern int recall_json_handle(const std::string& jstr);
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////////////本文件函数向前声明
|
||
|
||
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type);
|
||
std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid);
|
||
bool writeToFile(const std::string& filePath, const std::string& xmlContent);
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////////////
|
||
|
||
namespace rocketmq {
|
||
|
||
//-----------------------------------------------------------------------------
|
||
// RocketMQConsumer 方法实现
|
||
//-----------------------------------------------------------------------------
|
||
|
||
RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup,
|
||
const std::string& nameServer)
|
||
: consumer_(consumerGroup)
|
||
, listener_(nullptr) {
|
||
// 设置 NameServer 地址
|
||
consumer_.setNamesrvAddr(nameServer);
|
||
|
||
// 设置默认的会话凭证
|
||
consumer_.setSessionCredentials(G_MQCONSUMER_ACCESSKEY, G_MQCONSUMER_SECRETKEY, G_MQCONSUMER_CHANNEL);
|
||
|
||
// 初始化内部监听器
|
||
listener_ = new InternalListener(this);
|
||
}
|
||
|
||
void RocketMQConsumer::subscribe(const std::string& topic,
|
||
const std::string& tag,
|
||
MessageCallback callback) {
|
||
// 调用 C++ 接口的订阅方法,subExpression 参数支持 Tag 过滤,如 "TagA || TagB"
|
||
consumer_.subscribe(topic, tag);
|
||
std::cout << "[RocketMQConsumer] 已订阅 Topic: " << topic << ", Tag 表达式: " << tag << std::endl;
|
||
|
||
// 将 topic:tag 作为键,保存回调函数
|
||
std::string key = topic + ":" + tag;
|
||
std::lock_guard<std::mutex> lock(callbackMutex_);
|
||
callbackMap_[key] = callback;
|
||
}
|
||
|
||
void RocketMQConsumer::start() {
|
||
// 注册消息监听器
|
||
consumer_.registerMessageListener(listener_);
|
||
|
||
// 启动消费者
|
||
try {
|
||
consumer_.start();
|
||
std::cout << "[RocketMQConsumer] Consumer 已启动,等待消息..." << std::endl;
|
||
} catch (const MQClientException& e) {
|
||
std::cerr << "[RocketMQConsumer] 启动失败: " << e.what() << std::endl;
|
||
throw;
|
||
}
|
||
}
|
||
|
||
void RocketMQConsumer::shutdown() {
|
||
try {
|
||
consumer_.shutdown();
|
||
std::cout << "[RocketMQConsumer] Consumer 已关闭。" << std::endl;
|
||
} catch (const MQClientException& e) {
|
||
std::cerr << "[RocketMQConsumer] 关闭失败: " << e.what() << std::endl;
|
||
}
|
||
}
|
||
|
||
RocketMQConsumer::~RocketMQConsumer() {
|
||
// 先关闭消费者
|
||
try {
|
||
consumer_.shutdown();
|
||
} catch (...) {
|
||
// 忽略异常
|
||
}
|
||
// 清理监听器
|
||
delete listener_;
|
||
listener_ = nullptr;
|
||
std::cout << "[RocketMQConsumer] Consumer 销毁完毕。" << std::endl;
|
||
}
|
||
|
||
//-----------------------------------------------------------------------------
|
||
// RocketMQProducer 方法实现
|
||
//-----------------------------------------------------------------------------
|
||
|
||
RocketMQProducer::RocketMQProducer(const std::string& groupName,
|
||
const std::string& nameServer)
|
||
: producer_(groupName) {
|
||
// 设置 NameServer 地址
|
||
producer_.setNamesrvAddr(nameServer);
|
||
|
||
// 设置默认的会话凭证
|
||
producer_.setSessionCredentials(G_MQPRODUCER_ACCESSKEY, G_MQPRODUCER_SECRETKEY, "");
|
||
|
||
// 启动生产者
|
||
try {
|
||
producer_.start();
|
||
std::cout << "[RocketMQProducer] Producer 已启动。" << std::endl;
|
||
} catch (const MQClientException& e) {
|
||
std::cerr << "[RocketMQProducer] 启动失败: " << e.what() << std::endl;
|
||
throw;
|
||
}
|
||
}
|
||
|
||
void RocketMQProducer::sendMessage(const std::string& body,
|
||
const std::string& topic,
|
||
const std::string& tags,
|
||
const std::string& keys) {
|
||
try {
|
||
// 创建消息对象
|
||
MQMessage msg(topic, tags, keys, body);
|
||
|
||
// 同步发送
|
||
SendResult result = producer_.send(msg);
|
||
std::cout << "[RocketMQProducer] 消息发送成功. "
|
||
<< "Topic=" << topic
|
||
<< ", MsgID=" << result.getMsgId()
|
||
<< ", Status=" << result.getSendStatus()
|
||
<< std::endl;
|
||
} catch (const MQClientException& e) {
|
||
std::cerr << "[RocketMQProducer] 发送失败: " << e.what() << std::endl;
|
||
// 根据需要进行重试或日志记录
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl;
|
||
} catch (...) {
|
||
std::cerr << "[RocketMQProducer] 未知错误,消息发送失败。" << std::endl;
|
||
}
|
||
}
|
||
|
||
void RocketMQProducer::sendMessageOrderly(const std::string& body,
|
||
const std::string& topic,
|
||
const std::string& tags,
|
||
const std::string& keys) {
|
||
try {
|
||
// 创建消息对象
|
||
MQMessage msg(topic, tags, keys, body);
|
||
|
||
// 使用轮询队列选择器进行顺序发送
|
||
SendResult result = producer_.send(msg, &selector_, nullptr);
|
||
std::cout << "[RocketMQProducer] 顺序消息发送成功. "
|
||
<< "Topic=" << topic
|
||
<< ", MsgID=" << result.getMsgId()
|
||
<< ", Status=" << result.getSendStatus()
|
||
<< std::endl;
|
||
} catch (const MQClientException& e) {
|
||
std::cerr << "[RocketMQProducer] 顺序发送失败: " << e.what() << std::endl;
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl;
|
||
} catch (...) {
|
||
std::cerr << "[RocketMQProducer] 未知错误,顺序消息发送失败。" << std::endl;
|
||
}
|
||
}
|
||
|
||
RocketMQProducer::~RocketMQProducer() {
|
||
try {
|
||
producer_.shutdown();
|
||
std::cout << "[RocketMQProducer] Producer 已关闭。" << std::endl;
|
||
} catch (const MQClientException& e) {
|
||
std::cerr << "[RocketMQProducer] 关闭失败: " << e.what() << std::endl;
|
||
}
|
||
}
|
||
|
||
|
||
|
||
} // namespace rocketmq
|
||
|
||
///////////////////////////////////////////////////////////////////////////////////////////////////生产者接口
|
||
// 初始化生产者
|
||
void InitializeProducer(rocketmq::RocketMQProducer*& producer) {
|
||
if (producer == nullptr) {
|
||
try {
|
||
producer = new rocketmq::RocketMQProducer(G_ROCKETMQ_PRODUCER, G_MQPRODUCER_IPPORT);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "[InitializeProducer] Failed to initialize producer: " << e.what() << std::endl;
|
||
throw;
|
||
}
|
||
}
|
||
}
|
||
|
||
// 关闭并销毁生产者(在程序结束时调用一次)
|
||
void ShutdownAndDestroyProducer()
|
||
{
|
||
if (g_producer != NULL) {
|
||
delete g_producer;
|
||
g_producer = NULL;
|
||
}
|
||
}
|
||
|
||
// 使用 C++ 接口封装的 RocketMQProducer 类
|
||
void rocketmq_producer_send(rocketmq::RocketMQProducer* producer,
|
||
const std::string& body,
|
||
const std::string& topic) {
|
||
if (!producer) {
|
||
std::cerr << "[rocketmq_producer_send] producer 不可用,未初始化\n";
|
||
return;
|
||
}
|
||
|
||
const std::string& tags = G_ROCKETMQ_TAG;
|
||
const std::string& keys = G_ROCKETMQ_KEY;
|
||
|
||
try {
|
||
producer->sendMessage(body, topic, tags, keys);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl;
|
||
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程 MQ发送失败", get_front_msg_from_subdir(), g_front_seg_index);
|
||
}
|
||
}
|
||
|
||
//mq发送接口
|
||
void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
|
||
{
|
||
static std::string topic;
|
||
static std::string cfg_His_tp;
|
||
static std::string cfg_PLT_tp;
|
||
static std::string cfg_PST_tp;
|
||
static std::string cfg_Evt_tp;
|
||
static std::string cfg_Alm_tp;
|
||
static std::string cfg_Rt_tp;
|
||
static bool init = false;
|
||
if (!init) {
|
||
|
||
cfg_His_tp = TOPIC_STAT;
|
||
cfg_PLT_tp = TOPIC_PLT;
|
||
cfg_PST_tp = TOPIC_PST;
|
||
cfg_Evt_tp = TOPIC_EVENT;
|
||
cfg_Alm_tp = TOPIC_ALARM;
|
||
cfg_Rt_tp = TOPIC_RTDATA;
|
||
|
||
init = true;
|
||
}
|
||
|
||
std::string key = data.mp_id;
|
||
std::string senddata = data.strText;
|
||
if (data.strTopic == "HISDATA")
|
||
{
|
||
topic = cfg_His_tp;
|
||
}
|
||
else if (data.strTopic == "PLT")
|
||
{
|
||
topic = cfg_PLT_tp;
|
||
}
|
||
else if (data.strTopic == "PST")
|
||
{
|
||
topic = cfg_PST_tp;
|
||
}
|
||
else if (data.strTopic == "Alm")
|
||
{
|
||
topic = cfg_Alm_tp;
|
||
}
|
||
else if (data.strTopic == "RTDATA")
|
||
{
|
||
topic = cfg_Rt_tp;
|
||
}
|
||
else
|
||
{
|
||
topic = data.strTopic;
|
||
|
||
}
|
||
|
||
rocketmq_producer_send(producer,senddata,topic);
|
||
}
|
||
|
||
/////////////////////////////////////////////////////////////////////////////////////////////////查找台账下标
|
||
// 根据终端 ID 查找 terminal_devlist 中的索引,找不到返回 -1
|
||
int find_dev_index_from_dev_id(const std::string& dev_id) {
|
||
for (size_t i = 0; i < terminal_devlist.size(); ++i) {
|
||
if (terminal_devlist[i].terminal_id == dev_id) {
|
||
return static_cast<int>(i);
|
||
}
|
||
}
|
||
return -1; // 未找到
|
||
}
|
||
|
||
int find_mp_index_from_mp_id(const std::string& mp_id) {
|
||
for (const auto& dev : terminal_devlist) {
|
||
for (size_t j = 0; j < dev.line.size(); ++j) {
|
||
if (dev.line[j].monitor_id == mp_id) {
|
||
return static_cast<int>(j); // 返回 line[] 的下标
|
||
}
|
||
}
|
||
}
|
||
return -1; // 未找到
|
||
}
|
||
|
||
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理
|
||
|
||
std::string parseJsonMessageRC(const std::string& inputJson) {
|
||
// 解析输入 JSON 字符串
|
||
json root;
|
||
try {
|
||
root = json::parse(inputJson);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Error parsing JSON: " << e.what() << std::endl;
|
||
return "";
|
||
}
|
||
|
||
// 提取 "messageBody" 部分(它是一个字符串)
|
||
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
|
||
std::cerr << "'messageBody' is missing or is not a string" << std::endl;
|
||
return "";
|
||
}
|
||
|
||
std::string messageBodyStr = root["messageBody"].get<std::string>();
|
||
if (messageBodyStr.empty()) {
|
||
std::cerr << "'messageBody' is empty" << std::endl;
|
||
return "";
|
||
}
|
||
|
||
// 解析 messageBody 中的 JSON 字符串
|
||
json messageBody;
|
||
try {
|
||
messageBody = json::parse(messageBodyStr);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Failed to parse 'messageBody' JSON: " << e.what() << std::endl;
|
||
return "";
|
||
}
|
||
|
||
// 提取 "guid" 部分
|
||
if (!messageBody.contains("guid") || !messageBody["guid"].is_string()) {
|
||
std::cerr << "'guid' is missing or is not a string" << std::endl;
|
||
return "";
|
||
}
|
||
std::string guid = messageBody["guid"].get<std::string>();
|
||
|
||
// 发送 guid 回复
|
||
send_reply_to_queue(guid, "1", "收到补招指令");
|
||
|
||
// 提取 "data" 部分
|
||
if (!messageBody.contains("data") || !messageBody["data"].is_array()) {
|
||
std::cerr << "'data' is missing or is not an array" << std::endl;
|
||
return "";
|
||
}
|
||
|
||
// 返回 "data" 数组的字符串形式
|
||
try {
|
||
return messageBody["data"].dump(); // 默认带缩进;如需去除缩进:dump(-1)
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Error converting 'data' to string: " << e.what() << std::endl;
|
||
return "";
|
||
}
|
||
}
|
||
|
||
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,std::string& line,bool& realData,bool& soeData,int& limit){
|
||
json root;
|
||
try {
|
||
root = json::parse(body);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Failed to parse JSON message: " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 提取 "messageBody" 字符串
|
||
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
|
||
std::cerr << "'messageBody' is missing or not a string." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
std::string messageBodyStr = root["messageBody"];
|
||
if (messageBodyStr.empty()) {
|
||
std::cerr << "'messageBody' is empty." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 解析 "messageBody" 中的 JSON 字符串
|
||
json messageBody;
|
||
try {
|
||
messageBody = json::parse(messageBodyStr);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 检查并提取字段
|
||
if (!messageBody.contains("devSeries") ||
|
||
!messageBody.contains("line") ||
|
||
!messageBody.contains("realData") ||
|
||
!messageBody.contains("soeData") ||
|
||
!messageBody.contains("limit"))
|
||
{
|
||
std::cerr << "Missing expected fields in 'messageBody'." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
try {
|
||
devSeries = messageBody["devSeries"].get<std::string>();
|
||
line = messageBody["line"].get<std::string>();
|
||
realData = messageBody["realData"].get<bool>();
|
||
soeData = messageBody["soeData"].get<bool>();
|
||
limit = messageBody["limit"].get<int>();
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Type error while extracting fields: " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 提取 guid
|
||
std::string guid;
|
||
if (messageBody.contains("guid") && messageBody["guid"].is_string()) {
|
||
guid = messageBody["guid"].get<std::string>();
|
||
}
|
||
|
||
// 回复:执行结果直接看实时数据,不需要再回复,1是收到消息
|
||
if (!guid.empty()) {
|
||
send_reply_to_queue(guid, "1", "收到三秒数据指令");
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
bool parseJsonMessageSET(const std::string& json_str) {
|
||
json root;
|
||
try {
|
||
root = json::parse(json_str);
|
||
} catch (const std::exception& e) {
|
||
std::cout << "Error parsing JSON: " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
|
||
std::cerr << "'messageBody' is missing or is not a string" << std::endl;
|
||
return false;
|
||
}
|
||
|
||
std::string messageBodyStr = root["messageBody"];
|
||
if (messageBodyStr.empty()) {
|
||
std::cerr << "'messageBody' is empty" << std::endl;
|
||
return false;
|
||
}
|
||
|
||
json messageBody;
|
||
try {
|
||
messageBody = json::parse(messageBodyStr);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 获取字段
|
||
if (!messageBody.contains("guid") || !messageBody.contains("code") ||
|
||
!messageBody.contains("processNo") || !messageBody.contains("fun") ||
|
||
!messageBody.contains("frontType")) {
|
||
std::cout << "Missing one or more required fields in messageBody." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
std::string guid, code_str, fun, frontType;
|
||
int index_value = 0;
|
||
|
||
try {
|
||
guid = messageBody["guid"].get<std::string>();
|
||
code_str = messageBody["code"].get<std::string>();
|
||
index_value = messageBody["processNo"].get<int>();
|
||
fun = messageBody["fun"].get<std::string>();
|
||
frontType = messageBody["frontType"].get<std::string>();
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Field parsing error: " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 判断进程号是否匹配
|
||
if (index_value != g_front_seg_index && g_front_seg_index != 0) {
|
||
std::cout << "msg index: " << index_value << " doesn't match self index: " << g_front_seg_index << std::endl;
|
||
return true;
|
||
}
|
||
|
||
std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl;
|
||
|
||
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
|
||
|
||
if (code_str == "set_process") {
|
||
if (!messageBody.contains("processNum")) {
|
||
std::cout << "Missing 'processNum' in JSON." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
int processNum = 0;
|
||
try {
|
||
processNum = messageBody["processNum"].get<int>();
|
||
} catch (...) {
|
||
std::cout << "'processNum' parsing failed." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 校验参数并执行
|
||
if ((fun == "reset" || fun == "add") &&
|
||
(processNum >= 1 && processNum < 10) &&
|
||
(frontType == "stat" || frontType == "recall" || frontType == "all")) {
|
||
|
||
if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
|
||
|
||
execute_bash(fun, processNum, frontType);
|
||
|
||
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str());
|
||
|
||
send_reply_to_queue(guid, "1", "收到重置进程指令,重启所有进程!");
|
||
std::cout << "this msg should only execute once" << std::endl;
|
||
} else {
|
||
std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl;
|
||
}
|
||
}
|
||
else if (fun == "delete") {
|
||
|
||
send_reply_to_queue(guid, "1", "收到删除进程指令,这个进程将会重启 ");
|
||
|
||
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index, fun.c_str());
|
||
|
||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||
::_exit(-1039); // 进程退出
|
||
}
|
||
else {
|
||
std::cout << "param is not executable" << std::endl;
|
||
}
|
||
} else {
|
||
std::cout << "set process code str error" << std::endl;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
bool parseJsonMessageLOG(const std::string& json_str) {
|
||
json root;
|
||
try {
|
||
root = json::parse(json_str);
|
||
} catch (const std::exception& e) {
|
||
std::cout << "Error parsing JSON: " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 提取 messageBody(JSON 字符串)
|
||
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
|
||
std::cerr << "'messageBody' is missing or not a string" << std::endl;
|
||
return false;
|
||
}
|
||
|
||
std::string messageBodyStr = root["messageBody"];
|
||
if (messageBodyStr.empty()) {
|
||
std::cerr << "'messageBody' is empty." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
json messageBody;
|
||
try {
|
||
messageBody = json::parse(messageBodyStr);
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 校验字段是否存在
|
||
static const std::array<std::string, 8> required_fields = {
|
||
"guid", "code", "processNo", "id", "level", "grade", "logtype", "frontType"
|
||
};
|
||
|
||
for (const auto& field : required_fields) {
|
||
if (!messageBody.contains(field)) {
|
||
std::cout << "Missing '" << field << "' in messageBody." << std::endl;
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// 提取字段
|
||
std::string guid, code_str, id, level, grade, logtype, frontType;
|
||
int processNo = 0;
|
||
|
||
try {
|
||
guid = messageBody["guid"].get<std::string>();
|
||
code_str = messageBody["code"].get<std::string>();
|
||
processNo = messageBody["processNo"].get<int>();
|
||
id = messageBody["id"].get<std::string>();
|
||
level = messageBody["level"].get<std::string>();
|
||
grade = messageBody["grade"].get<std::string>();
|
||
logtype = messageBody["logtype"].get<std::string>();
|
||
frontType = messageBody["frontType"].get<std::string>();
|
||
} catch (const std::exception& e) {
|
||
std::cerr << "Error extracting fields: " << e.what() << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 判断进程号是否匹配
|
||
if (processNo != g_front_seg_index) {
|
||
std::cout << "msg index: " << processNo << " doesn't match self index: " << g_front_seg_index << std::endl;
|
||
return true;
|
||
}
|
||
|
||
// 判断 frontType 是否匹配
|
||
if (frontType != subdir) {
|
||
std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl;
|
||
return true;
|
||
}
|
||
|
||
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理日志上送消息", get_front_msg_from_subdir(), g_front_seg_index);
|
||
|
||
std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl;
|
||
std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;
|
||
|
||
// 回复消息
|
||
send_reply_to_queue(guid, "1", "收到实时日志指令");
|
||
|
||
if (code_str == "set_log") {
|
||
// 校验数据合法性
|
||
bool valid =
|
||
(level == "terminal" || level == "measurepoint") &&
|
||
(grade == "NORMAL" || grade == "DEBUG") &&
|
||
(logtype == "com" || logtype == "data") &&
|
||
(!id.empty() && !is_blank(id));
|
||
|
||
if (valid) {
|
||
process_log_command(id, level, grade, logtype);
|
||
} else {
|
||
std::cout << "type doesn't match" << std::endl;
|
||
DIY_WARNLOG("process", "【WARN】前置的%s%d号进程处理日志上送消息,格式不正确", get_front_msg_from_subdir(), g_front_seg_index);
|
||
}
|
||
|
||
std::cout << "this msg should only execute once" << std::endl;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
bool parseJsonMessageUD(const std::string& json_str, const std::string& output_dir) {
|
||
json root;
|
||
try {
|
||
root = json::parse(json_str);
|
||
} catch (...) {
|
||
std::cout << "Error parsing JSON." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
|
||
std::cerr << "'messageBody' is missing or is not a string" << std::endl;
|
||
return false;
|
||
}
|
||
|
||
std::string messageBodyStr = root["messageBody"];
|
||
if (messageBodyStr.empty()) {
|
||
std::cerr << "'messageBody' is empty." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
json messageBody;
|
||
try {
|
||
messageBody = json::parse(messageBodyStr);
|
||
} catch (...) {
|
||
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 提取字段
|
||
std::string code_str = messageBody.value("code", "");
|
||
int process_No = messageBody.value("processNo", -1);
|
||
std::string guid = messageBody.value("guid", "");
|
||
|
||
if (guid.empty() || code_str.empty() || process_No == -1) {
|
||
std::cout << "Missing required fields: guid/code/processNo" << std::endl;
|
||
return false;
|
||
}
|
||
|
||
if (process_No != g_front_seg_index && g_front_seg_index != 0) {
|
||
std::cout << "msg index: " << process_No << " doesn't match self index: " << g_front_seg_index << std::endl;
|
||
return true;
|
||
}
|
||
|
||
std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl;
|
||
|
||
DIY_INFOLOG("process", "【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息",
|
||
get_front_msg_from_subdir(), g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
|
||
|
||
send_reply_to_queue(guid, "1", "收到台账更新指令");
|
||
|
||
if (code_str == "add_terminal" || code_str == "ledger_modify") {
|
||
std::cout << "add or update ledger" << std::endl;
|
||
|
||
if (messageBody.contains("data") && messageBody["data"].is_array()) {
|
||
for (const auto& item : messageBody["data"]) {
|
||
terminal_dev json_data;
|
||
|
||
json_data.terminal_id = item.value("id", "");
|
||
json_data.terminal_name = item.value("name", "");
|
||
json_data.org_name = item.value("org_name", "");
|
||
json_data.maint_name = item.value("maint_name", "");
|
||
json_data.station_name = item.value("stationName", "");
|
||
json_data.tmnl_factory = item.value("manufacturer", "");
|
||
json_data.tmnl_status = item.value("status", "");
|
||
json_data.dev_type = item.value("devType", "");
|
||
json_data.dev_key = item.value("devKey", "");
|
||
json_data.dev_series = item.value("series", "");
|
||
|
||
int procNo = item.value("processNo", -1);
|
||
json_data.processNo = std::to_string(procNo);
|
||
|
||
json_data.addr_str = item.value("ip", "");
|
||
json_data.port = item.value("port", "");
|
||
json_data.timestamp = item.value("updateTime", "");
|
||
|
||
if (item.contains("monitorData") && item["monitorData"].is_array()) {
|
||
int j = 0;
|
||
for (const auto& monitor_item : item["monitorData"]) {
|
||
if (j >= 10) break;
|
||
auto& m = json_data.line[j++];
|
||
m.monitor_id = monitor_item.value("id", "");
|
||
m.monitor_name = monitor_item.value("name", "");
|
||
m.voltage_level = monitor_item.value("voltageLevel", "");
|
||
m.status = monitor_item.value("status", "");
|
||
m.logical_device_seq = monitor_item.value("lineNo", "");
|
||
m.terminal_connect = monitor_item.value("ptType", "");
|
||
m.timestamp = json_data.timestamp;
|
||
m.terminal_id = json_data.terminal_id;
|
||
}
|
||
}
|
||
|
||
print_terminal(json_data);
|
||
|
||
std::string xmlContent = prepare_update(code_str, json_data, guid);
|
||
if (!xmlContent.empty()) {
|
||
char nodeid[20];
|
||
std::sprintf(nodeid, "%u", g_node_id);
|
||
std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_" + code_str + ".xml";
|
||
writeToFile(file_name, xmlContent);
|
||
}
|
||
}
|
||
}
|
||
} else if (code_str == "delete_terminal") {
|
||
std::cout << "delete ledger" << std::endl;
|
||
if (messageBody.contains("data") && messageBody["data"].is_array()) {
|
||
for (const auto& item : messageBody["data"]) {
|
||
terminal_dev json_data{};
|
||
auto id = item.value("id", "");
|
||
json_data.terminal_id = id;
|
||
|
||
std::string xmlContent = prepare_update(code_str, json_data, guid);
|
||
if (!xmlContent.empty()) {
|
||
char nodeid[20];
|
||
std::sprintf(nodeid, "%u", g_node_id);
|
||
std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_delete_terminal.xml";
|
||
writeToFile(file_name, xmlContent);
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
std::cout << "code_str error" << std::endl;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数
|
||
|
||
rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& msg) {
|
||
//未初始化不处理消费
|
||
if (INITFLAG != 1) {
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
std::string body = msg.getBody();
|
||
std::string key = msg.getKeys();
|
||
|
||
if (body.empty()) {
|
||
std::cerr << "Message body is NULL or empty." << std::endl;
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 日志记录
|
||
DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的实时触发消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
|
||
|
||
std::cout << "rtdata Callback received message: " << body << std::endl;
|
||
if (!key.empty()) {
|
||
std::cout << "Message Key: " << key << std::endl;
|
||
} else {
|
||
std::cout << "Message Key: N/A" << std::endl;
|
||
}
|
||
|
||
// 消息解析
|
||
std::string devid, line;
|
||
bool realData = false, soeData = false;
|
||
int limit = 0;
|
||
|
||
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
|
||
std::cerr << "Failed to parse the JSON message." << std::endl;
|
||
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 加锁访问台账
|
||
int dev_index;
|
||
int mp_index;
|
||
if( !devid.empty() && !line.empty()){
|
||
std::lock_guard<std::mutex> lock(ledgermtx);
|
||
dev_index = find_dev_index_from_dev_id(devid);
|
||
mp_index = find_mp_index_from_mp_id(line);
|
||
}
|
||
else{
|
||
std::cerr << "rtdata is NULL." << std::endl;
|
||
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
|
||
}
|
||
|
||
|
||
if (dev_index == -1 || mp_index == -1) {
|
||
std::cerr << "dev index or mp index is not found" << std::endl;
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 写入 XML
|
||
if (!createXmlFile(dev_index, mp_index, realData, soeData, limit, "new")) {
|
||
DIY_ERRORLOG("process", "【ERROR】前置无法创建实时数据触发文件");
|
||
std::cerr << "Failed to create the XML file." << std::endl;
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
return rocketmq::CONSUME_SUCCESS;
|
||
}
|
||
|
||
rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg) {
|
||
//未初始化不处理消费
|
||
if (INITFLAG != 1) {
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
std::string body = msg.getBody();
|
||
std::string key = msg.getKeys();
|
||
|
||
if (body.empty()) {
|
||
std::cerr << "Message body is NULL or empty." << std::endl;
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 打印日志
|
||
std::cout << "ledger update Callback received message: " << body << std::endl;
|
||
if (!key.empty()) {
|
||
std::cout << "Message Key: " << key << std::endl;
|
||
} else {
|
||
std::cout << "Message Key: N/A" << std::endl;
|
||
}
|
||
|
||
// 调用业务逻辑处理函数
|
||
std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate";
|
||
if (!parseJsonMessageUD(body, updatefilepath)) {
|
||
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
|
||
}
|
||
|
||
return rocketmq::CONSUME_SUCCESS;
|
||
}
|
||
|
||
rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) {
|
||
//未初始化不处理消费
|
||
if (INITFLAG != 1) {
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
std::string body = msg.getBody();
|
||
std::string key = msg.getKeys();
|
||
|
||
if (body.empty()) {
|
||
std::cerr << "Message body is NULL or empty." << std::endl;
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 打印消息内容和 key
|
||
std::cout << "process set Callback received message: " << body << std::endl;
|
||
if (!key.empty()) {
|
||
std::cout << "Message Key: " << key << std::endl;
|
||
} else {
|
||
std::cout << "Message Key: N/A" << std::endl;
|
||
}
|
||
|
||
// 调用业务处理逻辑
|
||
if (!parseJsonMessageSET(body)) {
|
||
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
|
||
}
|
||
|
||
return rocketmq::CONSUME_SUCCESS;
|
||
}
|
||
|
||
rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg) {
|
||
//未初始化不处理消费
|
||
if (INITFLAG != 1) {
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
std::string body = msg.getBody();
|
||
std::string key = msg.getKeys();
|
||
|
||
if (body.empty()) {
|
||
std::cerr << "Message body is NULL or empty." << std::endl;
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 打印日志信息
|
||
std::cout << "log Callback received message: " << body << std::endl;
|
||
if (!key.empty()) {
|
||
std::cout << "Message Key: " << key << std::endl;
|
||
} else {
|
||
std::cout << "Message Key: N/A" << std::endl;
|
||
}
|
||
|
||
// 执行日志上送处理
|
||
if (!parseJsonMessageLOG(body)) {
|
||
DIY_ERRORLOG("process", "【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str());
|
||
}
|
||
|
||
return rocketmq::CONSUME_SUCCESS;
|
||
}
|
||
|
||
rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& msg) {
|
||
|
||
//未初始化不处理消费
|
||
if (INITFLAG != 1) {
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 调试输出
|
||
std::cout << "myMessageCallbackrecall" << std::endl;
|
||
|
||
std::string body = msg.getBody();
|
||
std::string key = msg.getKeys();
|
||
|
||
if (body.empty()) {
|
||
std::cerr << "Message body is NULL or empty." << std::endl;
|
||
return rocketmq::RECONSUME_LATER;
|
||
}
|
||
|
||
// 打印消息内容
|
||
std::cout << "recall Callback received message: " << body << std::endl;
|
||
if (!key.empty()) {
|
||
std::cout << "Message Key: " << key << std::endl;
|
||
} else {
|
||
std::cout << "Message Key: N/A" << std::endl;
|
||
}
|
||
|
||
// 解析 JSON 字符串
|
||
std::string result = parseJsonMessageRC(body); // 使用 std::string 接收解析结果
|
||
std::cout << "parseJsonMessageRC: " << result << std::endl;
|
||
|
||
if (!result.empty()) {
|
||
|
||
std::lock_guard<std::mutex> lock(ledgermtx);
|
||
recall_json_handle(result);
|
||
|
||
} else {
|
||
std::cerr << "recall data is NULL." << std::endl;
|
||
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RC.c_str());
|
||
}
|
||
|
||
return rocketmq::CONSUME_SUCCESS;
|
||
}
|
||
|
||
//////////////////////////////////////////////////////////////////////////////////////////////////生成实时触发和停止文件
|
||
|
||
//根据监测点序号和终端序号来生成触发文件,后续可修改
|
||
std::string createnewXmlContent(int devindex, int mpindex, bool realData, bool soeData, int limit)
|
||
{
|
||
std::ostringstream xmlContent;
|
||
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
|
||
<< "<Trigger3S>\n"
|
||
<< " <New>\n"
|
||
<< " <Trigger Line=\"" << mpindex << "\" "
|
||
<< "RealData=\"" << (realData ? "true" : "false") << "\" "
|
||
<< "DevSeries=\"" << devindex << "\" "
|
||
<< "Limit=\"" << limit << "\" "
|
||
<< "Count=\"0\" "
|
||
<< "SOEData=\"" << (soeData ? "true" : "false") << "\"/>\n"
|
||
<< " </New>\n"
|
||
<< "</Trigger3S>\n";
|
||
return xmlContent.str();
|
||
}
|
||
|
||
//根据监测点序号和终端序号来生成触发文件,后续可修改
|
||
std::string createdeleteXmlContent(int devindex, int mpindex)
|
||
{
|
||
std::ostringstream xmlContent;
|
||
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
|
||
<< "<Trigger3S>\n"
|
||
<< " <Delete>\n"
|
||
<< " <Trigger Line=\"" << mpindex << "\" "
|
||
<< "RealData=\"false\" "
|
||
<< "DevSeries=\"" << devindex << "\" "
|
||
<< "Limit=\"0\" "
|
||
<< "Count=\"0\" "
|
||
<< "SOEData=\"false\"/>\n"
|
||
<< " </Delete>\n"
|
||
<< "</Trigger3S>\n";
|
||
return xmlContent.str();
|
||
}
|
||
|
||
// 写入 XML 内容到文件的函数
|
||
bool writeToFile(const std::string& filePath, const std::string& xmlContent)
|
||
{
|
||
// 打开文件流以写入 XML 内容
|
||
std::ofstream outFile(filePath.c_str()); // 使用 c_str() 转换为 const char*
|
||
if (outFile.is_open()) {
|
||
outFile << xmlContent; // 写入内容
|
||
outFile.close();
|
||
std::cout << "XML file created at: " << filePath << std::endl;
|
||
return true;
|
||
} else {
|
||
std::cerr << "Failed to open file for writing: " << filePath << std::endl;
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// 创建并写入新的 XML 文件的主函数
|
||
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type)
|
||
{
|
||
std::string xmlContent = "";
|
||
std::string directory = "";
|
||
std::string filePath = "";
|
||
|
||
if(type == "new"){
|
||
// 构造 XML 内容
|
||
xmlContent = createnewXmlContent(devindex, mpindex, realData, soeData, limit);
|
||
|
||
// 设置文件路径
|
||
directory = FRONT_PATH + "/etc/trigger3s/";
|
||
filePath = directory + "newtrigger.xml";
|
||
}
|
||
else if(type == "delete"){
|
||
// 构造 XML 内容
|
||
xmlContent = createdeleteXmlContent(devindex, mpindex);
|
||
|
||
// 设置文件路径
|
||
directory = FRONT_PATH + "/etc/trigger3s/";
|
||
filePath = directory + "deletetrigger.xml";
|
||
}
|
||
else{
|
||
std::cerr << "Failed to create xmlfile,type error: " << std::endl;
|
||
return false;
|
||
}
|
||
|
||
|
||
// 创建目录(如果不存在)
|
||
if (system(("mkdir -p " + directory).c_str()) != 0) {
|
||
std::cerr << "Failed to create directory: " << directory << std::endl;
|
||
return false;
|
||
}
|
||
|
||
// 将 XML 内容写入文件
|
||
return writeToFile(filePath, xmlContent);
|
||
}
|
||
|
||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////生成台账更新文件
|
||
|
||
void add_indent(std::stringstream& stream, int level) {
|
||
for (int i = 0; i < level; ++i) {
|
||
stream << " "; // 每一级缩进 2 个空格
|
||
}
|
||
}
|
||
|
||
std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid)
|
||
{
|
||
std::cout << "prepare update" << std::endl;
|
||
|
||
std::stringstream xmlStream;
|
||
int indentLevel = 0; // 缩进级别
|
||
|
||
// 根节点
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<ledger_update>" << std::endl;
|
||
indentLevel++;
|
||
|
||
// 添加 guid 节点
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<guid>" << guid << "</guid>" << std::endl;
|
||
|
||
if (code_str == "ledger_modify" || code_str == "add_terminal") {
|
||
|
||
// 如果是 modify 类型
|
||
if (code_str == "ledger_modify") {
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<modify>" << std::endl;
|
||
indentLevel++;
|
||
}
|
||
else {
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<add>" << std::endl;
|
||
indentLevel++;
|
||
}
|
||
|
||
// 添加数据部分
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<terminalData>" << std::endl;
|
||
indentLevel++;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<ip>" << json_data.addr_str << "</ip>" << std::endl; // Assuming `addr_str` for IP
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<devType>" << json_data.dev_type << "</devType>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<maintName>" << json_data.maint_name << "</maintName>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<orgName>" << json_data.org_name << "</orgName>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<port>" << json_data.port << "</port>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<terminalCode>" << json_data.terminal_name << "</terminalCode>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<manufacturer>" << json_data.tmnl_factory << "</manufacturer>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<status>" << json_data.tmnl_status << "</status>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<series>" << json_data.dev_series << "</series>" << std::endl;
|
||
|
||
//lnk20250210
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<processNo>" << json_data.processNo << "</processNo>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<mac>" << json_data.mac << "</mac>" << std::endl;
|
||
|
||
// monitorData 部分
|
||
for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) {
|
||
const ledger_monitor& monitor = json_data.line[i];
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<monitorData" << (i + 1) << ">" << std::endl;
|
||
indentLevel++;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<id>" << monitor.monitor_id << "</id>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<name>" << monitor.monitor_name << "</name>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<lineNo>" << monitor.logical_device_seq << "</lineNo>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<voltageLevel>" << monitor.voltage_level << "</voltageLevel>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<ptType>" << monitor.terminal_connect << "</ptType>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<terminal_id>" << monitor.terminal_id << "</terminal_id>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<CT1>" << monitor.CT1 << "</CT1>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<CT2>" << monitor.CT2 << "</CT2>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<PT1>" << monitor.PT1 << "</PT1>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<PT2>" << monitor.PT2 << "</PT2>" << std::endl;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<status>" << monitor.status << "</status>" << std::endl;
|
||
|
||
indentLevel--;
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "</monitorData>" << std::endl;
|
||
}
|
||
|
||
indentLevel--;
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "</terminalData>" << std::endl;
|
||
|
||
// 结束 modify 或 add 标签
|
||
if (code_str == "ledger_modify") {
|
||
indentLevel--;
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "</modify>" << std::endl;
|
||
|
||
}
|
||
else {
|
||
indentLevel--;
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "</add>" << std::endl;
|
||
|
||
}
|
||
|
||
} else if (code_str == "delete_terminal") {
|
||
// 如果是 delete 类型
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<delete>" << std::endl;
|
||
indentLevel++;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<terminalData>" << std::endl;
|
||
indentLevel++;
|
||
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
|
||
|
||
indentLevel--;
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "</terminalData>" << std::endl;
|
||
|
||
indentLevel--;
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "</delete>" << std::endl;
|
||
}
|
||
else {
|
||
std::cerr << "code_str error" << std::endl;
|
||
return "";
|
||
}
|
||
|
||
// 结束根节点
|
||
indentLevel--;
|
||
add_indent(xmlStream, indentLevel);
|
||
xmlStream << "</ledger_update>" << std::endl;
|
||
|
||
return xmlStream.str(); // 返回构造的 XML 字符串
|
||
}
|
||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////终端连接消息
|
||
|
||
void connect_status_to_queue(const std::string& id, const std::string& datetime, int status)
|
||
{
|
||
try {
|
||
// 构造 JSON
|
||
nlohmann::json jsonObject;
|
||
jsonObject["id"] = id;
|
||
jsonObject["date"] = datetime;
|
||
jsonObject["status"] = status;
|
||
|
||
// 构造发送结构
|
||
queue_data_t data;
|
||
data.strTopic = G_CONNECT_TOPIC;
|
||
data.strText = jsonObject.dump(); // 转换为字符串
|
||
|
||
if (g_node_id == STAT_DATA_BASE_NODE_ID) {
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(data);
|
||
}
|
||
}
|
||
catch (const std::exception& e) {
|
||
std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl;
|
||
}
|
||
}
|
||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////响应消息
|
||
|
||
void send_reply_to_queue(const std::string& guid, const std::string& step, const std::string& result) {
|
||
try {
|
||
// 构造 JSON 对象
|
||
nlohmann::json obj;
|
||
obj["guid"] = guid;
|
||
obj["step"] = step;
|
||
obj["result"] = result;
|
||
obj["processNo"] = g_front_seg_index;
|
||
obj["frontType"] = get_front_type_from_subdir();
|
||
obj["nodeId"] = FRONT_INST;
|
||
|
||
// 构造 queue 消息
|
||
queue_data_t connect_info;
|
||
connect_info.strTopic = Topic_Reply_Topic;
|
||
connect_info.strText = obj.dump(); // 序列化为 JSON 字符串
|
||
|
||
// 加入发送队列(线程安全)
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(connect_info);
|
||
}
|
||
catch (const std::exception& e) {
|
||
std::cerr << "send_reply_to_queue exception: " << e.what() << std::endl;
|
||
}
|
||
}
|
||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////心跳消息
|
||
|
||
void send_heartbeat_to_queue(const std::string& status) {
|
||
try{
|
||
nlohmann::json obj;
|
||
obj["nodeId"] = FRONT_INST;
|
||
obj["frontType"] = get_front_type_from_subdir();
|
||
obj["processNo"] = g_front_seg_index;
|
||
obj["status"] = status;
|
||
|
||
queue_data_t connect_info;
|
||
connect_info.strTopic = Heart_Beat_Topic;
|
||
connect_info.strText = obj.dump(); // 紧凑格式 JSON
|
||
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(connect_info);
|
||
}
|
||
catch (const std::exception& e) {
|
||
std::cerr << "send_heartbeat_to_queue exception: " << e.what() << std::endl;
|
||
}
|
||
}
|
||
|
||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////稳态测试函数
|
||
|
||
bool shouldSkipTerminal(const std::string& terminal_id) {
|
||
for (size_t i = 0; i < TESTARRAY.size(); ++i) {
|
||
if (TESTARRAY[i] == terminal_id) {
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
}
|
||
|
||
void rocketmq_test_300(int mpnum, int front_index, int type,Front* front) {
|
||
|
||
if(!INITFLAG){
|
||
std::cout << "前置未初始化完成\n";
|
||
return;
|
||
}
|
||
|
||
if (!front || !front->m_producer) {
|
||
std::cerr << "front 或 producer 无效\n";
|
||
return;
|
||
}
|
||
|
||
rocketmq::RocketMQProducer* producer = front->m_producer;
|
||
|
||
queue_data_t data;
|
||
data.strTopic = G_ROCKETMQ_TOPIC_TEST;
|
||
data.mp_id = "0";
|
||
|
||
std::vector<std::string> filenames = {
|
||
"long_string.txt",
|
||
"PLT_string.txt",
|
||
"fluc_string.txt",
|
||
"qvvr_string.txt"
|
||
};
|
||
|
||
for (const auto& filename : filenames) {
|
||
std::ifstream file(filename);
|
||
if (!file.is_open()) {
|
||
std::cerr << "跳过无法打开的文件: " << filename << std::endl;
|
||
continue;
|
||
}
|
||
|
||
std::stringstream buffer;
|
||
buffer << file.rdbuf();
|
||
std::string base_strText = buffer.str();
|
||
|
||
std::time_t t = std::time(nullptr);
|
||
std::tm* time_info = std::localtime(&t);
|
||
time_info->tm_sec = 0;
|
||
std::time_t base_time_t = std::mktime(time_info);
|
||
long long current_time_ms = static_cast<long long>(base_time_t) * 1000;
|
||
|
||
int total_messages = mpnum;
|
||
|
||
if (type == 0) {
|
||
std::cout << "use ledger send msg" << std::endl;
|
||
|
||
for (size_t i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < terminal_devlist.size(); ++i) {
|
||
const auto& dev = terminal_devlist[i];
|
||
|
||
if (shouldSkipTerminal(dev.terminal_id)) {
|
||
std::cout << dev.terminal_id << " use true message" << std::endl;
|
||
continue;
|
||
}
|
||
|
||
for (size_t j = 0; j < dev.line.size(); ++j) {
|
||
if (dev.line[j].monitor_id.empty()) break;
|
||
|
||
data.mp_id = dev.line[j].monitor_id;
|
||
data.monitor_no = static_cast<int>(i + j);
|
||
std::string modified_time = std::to_string(current_time_ms);
|
||
|
||
std::string modified_strText = base_strText;
|
||
|
||
// 替换 Monitor
|
||
size_t monitor_pos = modified_strText.find("\"Monitor\"");
|
||
if (monitor_pos != std::string::npos) {
|
||
size_t colon_pos = modified_strText.find(":", monitor_pos);
|
||
size_t quote_pos = modified_strText.find("\"", colon_pos);
|
||
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
|
||
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
|
||
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id);
|
||
}
|
||
}
|
||
|
||
// 替换 TIME
|
||
size_t time_pos = modified_strText.find("\"TIME\"");
|
||
if (time_pos != std::string::npos) {
|
||
size_t colon_pos = modified_strText.find(":", time_pos);
|
||
size_t quote_pos = colon_pos;
|
||
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
|
||
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
|
||
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
|
||
}
|
||
}
|
||
|
||
data.strText = modified_strText;
|
||
//my_rocketmq_send(data,front->m_producer);
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(data);
|
||
|
||
std::cout << "Sent message " << (i + 1)
|
||
<< " with Monitor " << data.monitor_no
|
||
<< " and TIME " << modified_time << std::endl;
|
||
|
||
}
|
||
}
|
||
} else {
|
||
std::cout << "use monitor + number send msg" << std::endl;
|
||
|
||
for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 && g_node_id == 100) && i < total_messages; ++i) {
|
||
std::string monitor_id = "testmonitor" + std::to_string(i);
|
||
|
||
data.mp_id = monitor_id;
|
||
data.monitor_no = i;
|
||
std::string modified_time = std::to_string(current_time_ms);
|
||
std::string modified_strText = base_strText;
|
||
|
||
// 替换 Monitor
|
||
size_t monitor_pos = modified_strText.find("\"Monitor\"");
|
||
if (monitor_pos != std::string::npos) {
|
||
size_t colon_pos = modified_strText.find(":", monitor_pos);
|
||
size_t quote_pos = modified_strText.find("\"", colon_pos);
|
||
size_t end_quote_pos = modified_strText.find("\"", quote_pos + 1);
|
||
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
|
||
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, data.mp_id);
|
||
}
|
||
}
|
||
|
||
// 替换 TIME
|
||
size_t time_pos = modified_strText.find("\"TIME\"");
|
||
if (time_pos != std::string::npos) {
|
||
size_t colon_pos = modified_strText.find(":", time_pos);
|
||
size_t quote_pos = colon_pos;
|
||
size_t end_quote_pos = modified_strText.find(",", quote_pos + 1);
|
||
if (colon_pos != std::string::npos && quote_pos != std::string::npos && end_quote_pos != std::string::npos) {
|
||
modified_strText.replace(quote_pos + 1, end_quote_pos - quote_pos - 1, modified_time);
|
||
}
|
||
}
|
||
|
||
data.strText = modified_strText;
|
||
//my_rocketmq_send(data,front->m_producer);
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(data);
|
||
|
||
std::cout << "Sent message " << (i + 1)
|
||
<< " with Monitor " << data.monitor_no
|
||
<< " and TIME " << modified_time << std::endl;
|
||
|
||
}
|
||
}
|
||
|
||
std::cout << "Finished sending " << total_messages << " messages." << std::endl;
|
||
}
|
||
}
|
||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////////////其他测试函数
|
||
|
||
void rocketmq_test_rt(Front* front)//用来测试实时数据
|
||
{
|
||
if (!front || !front->m_producer) {
|
||
std::cerr << "front 或 producer 无效\n";
|
||
return;
|
||
}
|
||
|
||
rocketmq::RocketMQProducer* producer = front->m_producer;
|
||
|
||
queue_data_t data;
|
||
data.monitor_no = 123;
|
||
data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT;
|
||
std::ifstream file("rt.txt"); // 文件中存储长字符串
|
||
std::stringstream buffer;
|
||
buffer << file.rdbuf(); // 读取整个文件内容
|
||
|
||
data.strText = std::string(buffer.str());
|
||
data.mp_id = "123123";
|
||
//my_rocketmq_send(data,front->m_producer);
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(data);
|
||
}
|
||
|
||
void rocketmq_test_ud(Front* front)//用来测试台账更新
|
||
{
|
||
if (!front || !front->m_producer) {
|
||
std::cerr << "front 或 producer 无效\n";
|
||
return;
|
||
}
|
||
|
||
rocketmq::RocketMQProducer* producer = front->m_producer;
|
||
|
||
queue_data_t data;
|
||
data.monitor_no = 123;
|
||
data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD;
|
||
std::ifstream file("ud.txt"); // 文件中存储长字符串
|
||
std::stringstream buffer;
|
||
buffer << file.rdbuf(); // 读取整个文件内容
|
||
|
||
data.strText = std::string(buffer.str());
|
||
data.mp_id = "123123";
|
||
//my_rocketmq_send(data,front->m_producer);
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(data);
|
||
}
|
||
|
||
void rocketmq_test_set(Front* front)//用来测试进程控制脚本
|
||
{
|
||
if (!front || !front->m_producer) {
|
||
std::cerr << "front 或 producer 无效\n";
|
||
return;
|
||
}
|
||
|
||
rocketmq::RocketMQProducer* producer = front->m_producer;
|
||
|
||
queue_data_t data;
|
||
data.monitor_no = 123;
|
||
data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET;
|
||
std::ifstream file("set.txt"); // 文件中存储长字符串
|
||
std::stringstream buffer;
|
||
buffer << file.rdbuf(); // 读取整个文件内容
|
||
|
||
data.strText = std::string(buffer.str());
|
||
data.mp_id = "123123";
|
||
//my_rocketmq_send(data,front->m_producer);
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(data);
|
||
}
|
||
|
||
void rocketmq_test_rc(Front* front)//用来测试补招
|
||
{
|
||
if (!front || !front->m_producer) {
|
||
std::cerr << "front 或 producer 无效\n";
|
||
return;
|
||
}
|
||
|
||
rocketmq::RocketMQProducer* producer = front->m_producer;
|
||
|
||
queue_data_t data;
|
||
data.monitor_no = 123;
|
||
data.strTopic = std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC;
|
||
std::ifstream file("rc.txt"); // 文件中存储长字符串
|
||
std::stringstream buffer;
|
||
buffer << file.rdbuf(); // 读取整个文件内容
|
||
|
||
data.strText = std::string(buffer.str());
|
||
data.mp_id = "123123";
|
||
//my_rocketmq_send(data,front->m_producer);
|
||
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
|
||
queue_data_list.push_back(data);
|
||
}
|
||
|