Files
front_linux/LFtid1056/cloudfront/code/rocketmq.cpp
2025-10-15 16:29:54 +08:00

2344 lines
87 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

///////////////////////////////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <fstream>
#include <string>
#include <list>
#include <sstream>
#include <vector>
#include <deque>
#include <mutex>
#include <thread>
#include <chrono>
#include <ctime>
#include <cstdio>
#include <iomanip>
#include <sys/stat.h>
#include <fnmatch.h>
#include <atomic>
#include <map>
#include <vector>
#include <array>
//////////////////////////////////////////////////////////////////////////////////////////////////////
#include "rocketmq/DefaultMQProducer.h"
#include "rocketmq/DefaultMQPushConsumer.h"
#include "rocketmq/MQMessageListener.h"
#include "rocketmq/MQMessageExt.h"
#include "rocketmq/MQMessageQueue.h"
#include "rocketmq/MQSelector.h"
#include "rocketmq/SendResult.h"
#include "rocketmq/SessionCredentials.h"
#include "rocketmq.h"
#include "nlohmann/json.hpp"
#include "log4.h"
#include "interface.h"
#include "front.h"
#include "../../client2.h"
//////////////////////////////////////////////////////////////////////////////////////////////////////////
using namespace std;
using nlohmann::json;
//////////////////////////////////////////////////////////////////////////////////////////////////////////
#ifndef nullptr
#define nullptr NULL
#endif
///////////////////////////////////////////////////////////////////////////////////////////////////////////
std::mutex queue_data_list_mutex;
std::list<queue_data_t> queue_data_list;
static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者
std::mutex devidx_lock;
std::unordered_map<std::string, int> devIdxMap;//实时数据用的idx
///////////////////////////////////////////////////////////////////////////////////////////////////////////
//前置进程
extern unsigned int g_node_id;
extern std::string subdir;
extern std::string FRONT_INST;
//生产者线程控制
extern uint32_t g_mqproducer_blocked_times;
//初始化标志
extern int INITFLAG;
//测试用的终端数组
extern std::vector<std::string> TESTARRAY;
////////////////////////////////////////////////////////////////////////////////////////////////////////外部文件函数声明
extern void execute_bash(std::string fun,int process_num,std::string type);
extern int recall_json_handle_from_mq(const std::string& body);
//////////////////////////////////////////////////////////////////////////////////////////////////////本文件函数向前声明
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type);
std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid);
bool writeToFile(const std::string& filePath, const std::string& xmlContent);
//////////////////////////////////////////////////////////////////////////////////////////////////////消费起始控制
static const int64_t G_APP_START_MS = []() -> int64_t {
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}();
static const int64_t G_START_SKEW_MS = 1000; // 容错 1s可按需调整
static inline bool should_process_after_start(const rocketmq::MQMessageExt& msg) {
const int64_t born_ts = static_cast<int64_t>(msg.getBornTimestamp());
return born_ts >= (G_APP_START_MS - G_START_SKEW_MS);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////
namespace rocketmq {
//-----------------------------------------------------------------------------
// RocketMQConsumer 方法实现
//-----------------------------------------------------------------------------
RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup,
const std::string& nameServer)
: consumer_(consumerGroup)
, listener_(nullptr) {
// 设置 NameServer 地址
consumer_.setNamesrvAddr(nameServer);
// 设置默认的会话凭证
consumer_.setSessionCredentials(G_MQCONSUMER_ACCESSKEY, G_MQCONSUMER_SECRETKEY, G_MQCONSUMER_CHANNEL);
// 初始化内部监听器
listener_ = new InternalListener(this);
}
void RocketMQConsumer::subscribe(const std::string& topic,
const std::string& tag,
MessageCallback callback) {
// 调用 C++ 接口的订阅方法subExpression 参数支持 Tag 过滤,如 "TagA || TagB"
consumer_.subscribe(topic, tag);
std::cout << "[RocketMQConsumer] 已订阅 Topic: " << topic << ", Tag 表达式: " << tag << std::endl;
// 将 topic:tag 作为键,保存回调函数
std::string key = topic + ":" + tag;
std::lock_guard<std::mutex> lock(callbackMutex_);
callbackMap_[key] = callback;
}
void RocketMQConsumer::start() {
// 注册消息监听器
consumer_.registerMessageListener(listener_);
// 启动消费者
try {
consumer_.start();
std::cout << "[RocketMQConsumer] Consumer 已启动,等待消息..." << std::endl;
} catch (const MQClientException& e) {
std::cerr << "[RocketMQConsumer] 启动失败: " << e.what() << std::endl;
throw;
}
}
void RocketMQConsumer::shutdown() {
try {
consumer_.shutdown();
std::cout << "[RocketMQConsumer] Consumer 已关闭。" << std::endl;
} catch (const MQClientException& e) {
std::cerr << "[RocketMQConsumer] 关闭失败: " << e.what() << std::endl;
}
}
RocketMQConsumer::~RocketMQConsumer() {
// 先关闭消费者
try {
consumer_.shutdown();
} catch (...) {
// 忽略异常
}
// 清理监听器
delete listener_;
listener_ = nullptr;
std::cout << "[RocketMQConsumer] Consumer 销毁完毕。" << std::endl;
}
//-----------------------------------------------------------------------------
// RocketMQProducer 方法实现
//-----------------------------------------------------------------------------
RocketMQProducer::RocketMQProducer(const std::string& groupName,
const std::string& nameServer)
: producer_(groupName) {
// 设置 NameServer 地址
producer_.setNamesrvAddr(nameServer);
// 设置默认的会话凭证
producer_.setSessionCredentials(G_MQPRODUCER_ACCESSKEY, G_MQPRODUCER_SECRETKEY, "");
// 启动生产者
try {
producer_.start();
std::cout << "[RocketMQProducer] Producer 已启动。" << std::endl;
} catch (const MQClientException& e) {
std::cerr << "[RocketMQProducer] 启动失败: " << e.what() << std::endl;
throw;
}
}
void RocketMQProducer::sendMessage(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys) {
try {
// 创建消息对象
MQMessage msg(topic, tags, keys, body);
// 同步发送
SendResult result = producer_.send(msg);
std::cout << "[RocketMQProducer] 消息发送成功. "
<< "Topic=" << topic
<< ", MsgID=" << result.getMsgId()
<< ", Status=" << result.getSendStatus()
<< std::endl;
} catch (const MQClientException& e) {
std::cerr << "[RocketMQProducer] 发送失败: " << e.what() << std::endl;
// 根据需要进行重试或日志记录
} catch (const std::exception& e) {
std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl;
} catch (...) {
std::cerr << "[RocketMQProducer] 未知错误,消息发送失败。" << std::endl;
}
}
void RocketMQProducer::sendMessageOrderly(const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys) {
try {
// 创建消息对象
MQMessage msg(topic, tags, keys, body);
// 使用轮询队列选择器进行顺序发送
SendResult result = producer_.send(msg, &selector_, nullptr);
std::cout << "[RocketMQProducer] 顺序消息发送成功. "
<< "Topic=" << topic
<< ", MsgID=" << result.getMsgId()
<< ", Status=" << result.getSendStatus()
<< std::endl;
} catch (const MQClientException& e) {
std::cerr << "[RocketMQProducer] 顺序发送失败: " << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl;
} catch (...) {
std::cerr << "[RocketMQProducer] 未知错误,顺序消息发送失败。" << std::endl;
}
}
RocketMQProducer::~RocketMQProducer() {
try {
producer_.shutdown();
std::cout << "[RocketMQProducer] Producer 已关闭。" << std::endl;
} catch (const MQClientException& e) {
std::cerr << "[RocketMQProducer] 关闭失败: " << e.what() << std::endl;
}
}
} // namespace rocketmq
///////////////////////////////////////////////////////////////////////////////////////////////////生产者接口
// 初始化生产者
void InitializeProducer(rocketmq::RocketMQProducer*& producer) {
if (producer == nullptr) {
try {
producer = new rocketmq::RocketMQProducer(G_ROCKETMQ_PRODUCER, G_MQPRODUCER_IPPORT);
} catch (const std::exception& e) {
std::cerr << "[InitializeProducer] Failed to initialize producer: " << e.what() << std::endl;
throw;
}
}
}
// 关闭并销毁生产者(在程序结束时调用一次)
void ShutdownAndDestroyProducer()
{
if (g_producer != NULL) {
delete g_producer;
g_producer = NULL;
}
}
// 使用 C++ 接口封装的 RocketMQProducer 类
void rocketmq_producer_send(rocketmq::RocketMQProducer* producer,
const std::string& body,
const std::string& topic,
const std::string& tags,
const std::string& keys) {
if (!producer) {
std::cerr << "[rocketmq_producer_send] producer 不可用,未初始化\n";
return;
}
//const std::string& tags = G_ROCKETMQ_TAG;
//const std::string& keys = G_ROCKETMQ_KEY;
try {
producer->sendMessage(body, topic, tags, keys);
} catch (const std::exception& e) {
std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 MQ发送失败", g_front_seg_index);
}
}
//mq发送接口
void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
{
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static std::string cfg_Rt_tp;
static bool init = false;
if (!init) {
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
cfg_Rt_tp = TOPIC_RTDATA;
init = true;
}
std::string key = data.key;
std::string tag = data.tag;
std::string senddata = data.strText;
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else if (data.strTopic == "RTDATA")
{
topic = cfg_Rt_tp;
}
else
{
topic = data.strTopic;
}
rocketmq_producer_send(producer,senddata,topic,tag,key);
}
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& idx){
json root;
try {
root = json::parse(body);
} catch (const std::exception& e) {
std::cerr << "Failed to parse JSON message: " << e.what() << std::endl;
return false;
}
// 提取 "messageBody" 字符串
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
std::cerr << "'messageBody' is missing or not a string." << std::endl;
return false;
}
std::string messageBodyStr = root["messageBody"];
if (messageBodyStr.empty()) {
std::cerr << "'messageBody' is empty." << std::endl;
return false;
}
// 解析 "messageBody" 中的 JSON 字符串
json messageBody;
try {
messageBody = json::parse(messageBodyStr);
} catch (const std::exception& e) {
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
return false;
}
// 检查并提取字段
if (!messageBody.contains("devSeries") ||
!messageBody.contains("line") ||
!messageBody.contains("realData") ||
!messageBody.contains("soeData") ||
!messageBody.contains("limit")||
!messageBody.contains("idx"))
{
std::cerr << "Missing expected fields in 'messageBody'." << std::endl;
return false;
}
try {
devSeries = messageBody["devSeries"].get<std::string>();
line = messageBody["line"].get<ushort>();
realData = messageBody["realData"].get<bool>();
soeData = messageBody["soeData"].get<bool>();
limit = messageBody["limit"].get<int>();
idx = messageBody["idx"].get<int>();
} catch (const std::exception& e) {
std::cerr << "Type error while extracting fields: " << e.what() << std::endl;
return false;
}
// 提取 guid
std::string guid;
if (messageBody.contains("guid") && messageBody["guid"].is_string()) {
guid = messageBody["guid"].get<std::string>();
}
// 回复执行结果直接看实时数据不需要再回复1是收到消息
if (!guid.empty()) {
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到三秒数据指令");
}
return true;
}
bool parseJsonMessageSET(const std::string& json_str) {
json root;
try {
root = json::parse(json_str);
} catch (const std::exception& e) {
std::cout << "Error parsing JSON: " << e.what() << std::endl;
return false;
}
// [MOD] 允许 messageBody 既可为字符串也可为对象;保持原有错误打印
// ----- MOD BEGIN: messageBody 兼容 string/object -----
json messageBody;
if (!root.contains("messageBody")) {
std::cerr << "missing 'messageBody'" << std::endl;
return false;
}
if (root["messageBody"].is_string()) {
std::string messageBodyStr = root["messageBody"].get<std::string>();
if (messageBodyStr.empty()) {
std::cerr << "'messageBody' is empty" << std::endl;
return false;
}
try {
messageBody = json::parse(messageBodyStr);
} catch (const std::exception& e) {
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
return false;
}
} else if (root["messageBody"].is_object()) {
messageBody = root["messageBody"];
} else {
std::cerr << "'messageBody' is neither string nor object" << std::endl;
return false;
}
// ----- MOD END: messageBody 兼容 string/object -----
// [MOD] 基础必填字段仅校验 guid/code/processNo/funfrontType、processNum 改为按功能分支再校验
// ----- MOD BEGIN: 基础字段按需校验 -----
if (!messageBody.contains("guid") ||
!messageBody.contains("code") ||
!messageBody.contains("processNo") ||
!messageBody.contains("fun")) {
std::cout << "Missing one or more required fields in messageBody." << std::endl;
return false;
}
// ----- MOD END: 基础字段按需校验 -----
std::string guid, code_str, fun;
// [MOD] frontType 改为可选,给默认值 "all"
// ----- MOD BEGIN: frontType 可选 -----
std::string frontType = "all";
// ----- MOD END: frontType 可选 -----
int index_value = 0;
try {
guid = messageBody["guid"].get<std::string>();
code_str = messageBody["code"].get<std::string>();
index_value = messageBody["processNo"].get<int>();
fun = messageBody["fun"].get<std::string>();
// [MOD] 仅当存在 frontType 且为 string 时再读取,保持兼容
// ----- MOD BEGIN: frontType 存在才解析 -----
if (messageBody.contains("frontType") && messageBody["frontType"].is_string()) {
frontType = messageBody["frontType"].get<std::string>();
}
// ----- MOD END: frontType 存在才解析 -----
} catch (const std::exception& e) {
std::cerr << "Field parsing error: " << e.what() << std::endl;
return false;
}
// 判断进程号是否匹配(保留原逻辑)
if (index_value != g_front_seg_index && g_front_seg_index != 0) {
std::cout << "msg index: " << index_value << " doesn't match self index: " << g_front_seg_index << std::endl;
return true;
}
std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl;
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str());
if (code_str == "set_process") {
// [MOD] 按功能分支分别校验参数:
// reset/add 需要 processNum且可选 frontType默认 all
// delete 不需要 frontType/processNum
// ----- MOD BEGIN: 分功能按需校验与执行 -----
if (fun == "reset" || fun == "add") {
if (!messageBody.contains("processNum")) {
std::cout << "Missing 'processNum' in JSON." << std::endl;
return false;
}
int processNum = 0;
try {
processNum = messageBody["processNum"].get<int>();
} catch (...) {
std::cout << "'processNum' parsing failed." << std::endl;
return false;
}
// 校验参数并执行保留你原校验条件frontType 允许默认 all
if ((processNum >= 1 && processNum < 10) &&
(frontType == "cloudfront" || frontType == "all")) {
// if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) {
if (g_front_seg_index == 1) {
execute_bash(fun, processNum, frontType);
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",
g_front_seg_index, fun.c_str());
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!");
std::cout << "this msg should only execute once" << std::endl;
} else {
std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl;
}
} else {
std::cout << "param is not executable" << std::endl;
}
} else if (fun == "delete") {
// delete 分支:不要求 frontType/processNum
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到删除进程指令,这个进程将会重启 ");
DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启",
g_front_seg_index, fun.c_str());
std::this_thread::sleep_for(std::chrono::seconds(3));
::_exit(-1039); // 进程退出
} else {
std::cout << "param is not executable" << std::endl;
}
// ----- MOD END: 分功能按需校验与执行 -----
} else {
std::cout << "set process code str error" << std::endl;
}
return true;
}
bool parseJsonMessageLOG(const std::string& json_str) {
json root;
try {
root = json::parse(json_str);
} catch (const std::exception& e) {
std::cout << "Error parsing JSON: " << e.what() << std::endl;
return false;
}
// 提取 messageBodyJSON 字符串)
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
std::cerr << "'messageBody' is missing or not a string" << std::endl;
return false;
}
std::string messageBodyStr = root["messageBody"];
if (messageBodyStr.empty()) {
std::cerr << "'messageBody' is empty." << std::endl;
return false;
}
json messageBody;
try {
messageBody = json::parse(messageBodyStr);
} catch (const std::exception& e) {
std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl;
return false;
}
// 校验字段是否存在
static const std::array<std::string, 8> required_fields = {
"guid", "code", "processNo", "id", "level", "grade", "logtype", "frontType"
};
for (const auto& field : required_fields) {
if (!messageBody.contains(field)) {
std::cout << "Missing '" << field << "' in messageBody." << std::endl;
return false;
}
}
// 提取字段
std::string guid, code_str, id, level, grade, logtype, frontType;
int processNo = 0;
try {
guid = messageBody["guid"].get<std::string>();
code_str = messageBody["code"].get<std::string>();
processNo = messageBody["processNo"].get<int>();
id = messageBody["id"].get<std::string>();
level = messageBody["level"].get<std::string>();
grade = messageBody["grade"].get<std::string>();
logtype = messageBody["logtype"].get<std::string>();
frontType = messageBody["frontType"].get<std::string>();
} catch (const std::exception& e) {
std::cerr << "Error extracting fields: " << e.what() << std::endl;
return false;
}
// 判断进程号是否匹配
if (processNo != g_front_seg_index) {
std::cout << "msg index: " << processNo << " doesn't match self index: " << g_front_seg_index << std::endl;
return true;
}
// 判断 frontType 是否匹配
/*if (frontType != subdir) {
std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl;
return true;
}*/
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理日志上送消息", g_front_seg_index);
std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl;
/*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/
// 回复消息
send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到实时日志指令");
if (code_str == "set_log") {
// 校验数据合法性
bool valid =
(level == "terminal" || level == "measurepoint") &&
(grade == "NORMAL" || grade == "DEBUG") &&
(logtype == "com" || logtype == "data") &&
(!id.empty() && !is_blank(id));
if (valid) {
process_log_command(id, level, grade, logtype);
} else {
std::cout << "type doesn't match" << std::endl;
DIY_WARNLOG("process", "【WARN】前置的%d号进程处理日志上送消息,格式不正确", g_front_seg_index);
}
std::cout << "this msg should only execute once" << std::endl;
}
return true;
}
bool parseJsonMessageUD(const std::string& json_str, const std::string& output_dir) {
json root;
try {
root = json::parse(json_str);
} catch (...) {
std::cout << "Error parsing JSON." << std::endl;
return false;
}
if (!root.contains("messageBody") || !root["messageBody"].is_string()) {
std::cerr << "'messageBody' is missing or is not a string" << std::endl;
return false;
}
std::string messageBodyStr = root["messageBody"];
if (messageBodyStr.empty()) {
std::cerr << "'messageBody' is empty." << std::endl;
return false;
}
json messageBody;
try {
messageBody = json::parse(messageBodyStr);
} catch (...) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
return false;
}
// 提取字段
std::string code_str = messageBody.value("code", "");
int process_No = messageBody.value("processNo", -1);
std::string guid = messageBody.value("guid", "");
if (guid.empty() || code_str.empty() || process_No == -1) {
std::cout << "Missing required fields: guid/code/processNo" << std::endl;
return false;
}
if (process_No != g_front_seg_index && g_front_seg_index != 0) {
std::cout << "msg index: " << process_No << " doesn't match self index: " << g_front_seg_index << std::endl;
return true;
}
std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl;
DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息",
g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
//send_reply_to_queue(guid, static_cast<int>(ResponseCode::ACCEPTED), "收到台账更新指令");
std::vector<DeviceReply> reply_list;
if (code_str == "add_terminal" || code_str == "ledger_modify") {
std::cout << "add or modify ledger" << std::endl;
if (messageBody.contains("data") && messageBody["data"].is_array()) {
for (const auto& item : messageBody["data"]) {
terminal_dev json_data{};
json_data.terminal_id = item.value("id", "");
json_data.terminal_name = item.value("name", "");
//json_data.org_name = item.value("org_name", "");
//json_data.maint_name = item.value("maint_name", "");
//json_data.station_name = item.value("stationName", "");
//json_data.tmnl_factory = item.value("manufacturer", "");
//json_data.tmnl_status = item.value("status", "");
json_data.dev_type = item.value("devType", "");
//json_data.dev_key = item.value("devKey", "");
//json_data.dev_series = item.value("series", "");
int procNo = -1; // 兼容 item.processNo 类型
if (item.contains("processNo")) {
if (item["processNo"].is_number_integer()) procNo = item["processNo"].get<int>();
else if (item["processNo"].is_string()) { try { procNo = std::stoi(item["processNo"].get<std::string>()); } catch(...) { procNo = -1; } }
}
json_data.processNo = std::to_string(procNo);
//int procNum = item.value("maxProcessNum", -1);
//json_data.maxProcessNum = std::to_string(procNum);
//json_data.addr_str = item.value("ip", "");
//json_data.port = item.value("port", "");
//json_data.timestamp = item.value("updateTime", "");
json_data.Righttime = item.value("Righttime", "");
if (item.contains("monitorData") && item["monitorData"].is_array()) {
size_t j = 0;
constexpr size_t kMaxLines = std::extent<decltype(terminal_dev::line)>::value; // 如果是 C 数组
for (const auto& monitor_item : item["monitorData"]) {
if (j >= kMaxLines) break;
auto& m = json_data.line[j++];
m.monitor_id = monitor_item.value("id", "");
m.monitor_name = monitor_item.value("name", "");
m.voltage_level = monitor_item.value("voltageLevel", "");
m.status = monitor_item.value("status", "");
m.logical_device_seq = monitor_item.value("lineNo", "");
m.terminal_connect = monitor_item.value("ptType", "");
//m.timestamp = json_data.timestamp;
m.terminal_id = monitor_item.value("deviceId", json_data.terminal_id);
m.CT1 = monitor_item.value("CT1", 0.0);
m.CT2 = monitor_item.value("CT2", 0.0);
m.PT1 = monitor_item.value("PT1", 0.0);
m.PT2 = monitor_item.value("PT2", 0.0);
}
}
print_terminal(json_data);
/*std::string xmlContent = prepare_update(code_str, json_data, guid);
if (!xmlContent.empty()) {
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id);
std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_" + code_str + ".xml";
writeToFile(file_name, xmlContent);
}*/
if(code_str == "add_terminal"){
DeviceReply one;
one.deviceId = json_data.terminal_id;
std::lock_guard<std::mutex> lock(ledgermtx);
// ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中
const std::string& tid = json_data.terminal_id;
auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev& d){ return d.terminal_id == tid; });
if (it == terminal_devlist.end()) {
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账添加成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 已存在该装置,修改这个装置的台账");*/
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
//删除旧的
ClientManager::instance().remove_device(json_data.terminal_id);
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "装置已存在,台账修改成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "装置已存在,但是无法擦除旧数据,台账修改失败";
}
}
reply_list.push_back(std::move(one));
}
else if(code_str == "ledger_modify"){
DeviceReply one;
one.deviceId = json_data.terminal_id;
std::lock_guard<std::mutex> lock(ledgermtx);
// ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中
const std::string& tid = json_data.terminal_id;
auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev& d){ return d.terminal_id == tid; });
if (it == terminal_devlist.end()) {
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::NOT_FOUND),
"终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置");*/
DIY_WARNLOG("process", "【WARN】无法修改台账未找到指定装置: %s ,改为添加这个装置", tid.c_str());
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账添加成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "装置不存在,台账添加成功";
}
else{
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
//删除旧的
ClientManager::instance().remove_device(json_data.terminal_id);
init_loggers_bydevid(json_data.terminal_id);
terminal_devlist.push_back(json_data);
//调用接口添加到通讯列表
DeviceInfo device = make_device_from_terminal(json_data);
ClientManager::instance().add_device(device);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账修改成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账修改成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账修改失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "无法擦除旧数据,台账修改失败";
}
}
reply_list.push_back(std::move(one));
}
}
}
} else if (code_str == "delete_terminal") {
std::cout << "delete ledger" << std::endl;
if (messageBody.contains("data") && messageBody["data"].is_array()) {
for (const auto& item : messageBody["data"]) {
terminal_dev json_data{};
auto id = item.value("id", "");
json_data.terminal_id = id;
/*std::string xmlContent = prepare_update(code_str, json_data, guid);
if (!xmlContent.empty()) {
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id);
std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_delete_terminal.xml";
writeToFile(file_name, xmlContent);
}*/
DeviceReply one;
one.deviceId = json_data.terminal_id;
//直接加锁删除
std::lock_guard<std::mutex> lock(ledgermtx);
// ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中
const std::string& tid = json_data.terminal_id;
auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(),
[&](const terminal_dev& d){ return d.terminal_id == tid; });
if (it == terminal_devlist.end()) {
// 终端 id 不存在于当前进程维护的终端列表中
one.code = static_cast<int>(ResponseCode::OK);
one.result = "装置不存在,无法删除台账";
}
else {
// 终端 id 存在于当前进程维护的终端列表中
if(erase_one_terminals_by_id(json_data.terminal_id) == 1){
ClientManager::instance().remove_device(json_data.terminal_id);
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::OK),
"终端 id: " + json_data.terminal_id + " 台账删除成功");*/
one.code = static_cast<int>(ResponseCode::OK);
one.result = "台账删除成功";
}
else{
/*send_reply_to_queue(json_data.guid, static_cast<int>(ResponseCode::BAD_REQUEST),
"终端 id: " + json_data.terminal_id + " 台账删除失败");*/
one.code = static_cast<int>(ResponseCode::BAD_REQUEST);
one.result = "无法擦除旧数据,台账删除失败";
}
}
reply_list.push_back(std::move(one));
}
}
} else {
std::cout << "code_str error" << std::endl;
}
send_batch_reply_to_queue(guid, reply_list);
return true;
}
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数
rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& msg) {
//未初始化不处理消费
if (INITFLAG != 1) {
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
if (body.empty()) {
std::cerr << "Message body is NULL or empty." << std::endl;
return rocketmq::RECONSUME_LATER;
}
// 日志记录
DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的实时触发消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
std::cout << "rtdata Callback received message: " << body << std::endl;
if (!key.empty()) {
std::cout << "Message Key: " << key << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
}
// 消息解析
std::string devid;
ushort line;
bool realData = false, soeData = false;
int limit = 0;
int idx = 0;
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit,idx)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
return rocketmq::RECONSUME_LATER;
}
// 加锁访问台账
if( !devid.empty() && line > 0){
//不再使用文件触发方式,直接调用接口向终端发起请求
//不注册guid直接将请求指令下发装置排队处理
//添加在线判断
if (ClientManager::instance().get_dev_status(devid) != 1) {
std::cout << "devid对应装置不在线: " << devid << std::endl;
// 记录日志不响应 web 端
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的实时数据触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str());
return rocketmq::CONSUME_SUCCESS;
}
//记录idx
devidx_set(devid, idx);//每次下发都会更新,不加入运行用的结构体
ClientManager::instance().set_real_state_count(devid, 60, line);//一秒询问一次询问60次,下一次同一个测点调用的话就会刷新
}
else{
std::cerr << "rtdata is NULL." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
return rocketmq::RECONSUME_LATER;
}
return rocketmq::CONSUME_SUCCESS;
}
rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg) {
//未初始化不处理消费
if (INITFLAG != 1) {
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
if (body.empty()) {
std::cerr << "Message body is NULL or empty." << std::endl;
return rocketmq::RECONSUME_LATER;
}
// 打印日志
std::cout << "ledger update Callback received message: " << body << std::endl;
if (!key.empty()) {
std::cout << "Message Key: " << key << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
}
// 调用业务逻辑处理函数
std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate";
if (!parseJsonMessageUD(body, updatefilepath)) {
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str());
}
return rocketmq::CONSUME_SUCCESS;
}
rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) {
//未初始化不处理消费
if (INITFLAG != 1) {
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
if (body.empty()) {
std::cerr << "Message body is NULL or empty." << std::endl;
return rocketmq::RECONSUME_LATER;
}
// 打印消息内容和 key
std::cout << "process set Callback received message: " << body << std::endl;
if (!key.empty()) {
std::cout << "Message Key: " << key << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
}
// 调用业务处理逻辑
if (!parseJsonMessageSET(body)) {
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s - tag:%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index, G_MQCONSUMER_TOPIC_SET.c_str(), FRONT_INST.c_str());
}
return rocketmq::CONSUME_SUCCESS;
}
rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg) {
//未初始化不处理消费
if (INITFLAG != 1) {
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
std::string body = msg.getBody();
std::string key = msg.getKeys();
if (body.empty()) {
std::cerr << "Message body is NULL or empty." << std::endl;
return rocketmq::RECONSUME_LATER;
}
// 打印日志信息
std::cout << "log Callback received message: " << body << std::endl;
if (!key.empty()) {
std::cout << "Message Key: " << key << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
}
// 执行日志上送处理
if (!parseJsonMessageLOG(body)) {
DIY_ERRORLOG("process", "【ERROR】前置的%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str());
}
return rocketmq::CONSUME_SUCCESS;
}
rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& msg) {
//未初始化不处理消费
if (INITFLAG != 1) {
return rocketmq::RECONSUME_LATER;
}
// [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK即使并发也安全
// ----- MOD BEGIN: 启动后消息过滤 -----
if (!should_process_after_start(msg)) {
std::cout << "[SET] skip old message: "
<< "topic=" << msg.getTopic()
<< ", queueId=" << msg.getQueueId()
<< ", offset=" << msg.getQueueOffset()
<< ", bornTs=" << msg.getBornTimestamp()
<< ", appStart=" << G_APP_START_MS
<< std::endl;
return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投
}
// ----- MOD END: 启动后消息过滤 -----
// 调试输出
std::cout << "myMessageCallbackrecall" << std::endl;
std::string body = msg.getBody();
std::string key = msg.getKeys();
if (body.empty()) {
std::cerr << "Message body is NULL or empty." << std::endl;
return rocketmq::RECONSUME_LATER;
}
// 打印消息内容
std::cout << "recall Callback received message: " << body << std::endl;
if (!key.empty()) {
std::cout << "Message Key: " << key << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
}
// 解析 JSON 字符串
recall_json_handle_from_mq(body);//不再使用文件补招方式
return rocketmq::CONSUME_SUCCESS;
}
//////////////////////////////////////////////////////////////////////////////////////////////////生成实时触发和停止文件
//根据监测点序号和终端序号来生成触发文件,后续可修改
std::string createnewXmlContent(int devindex, int mpindex, bool realData, bool soeData, int limit)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <New>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"" << (realData ? "true" : "false") << "\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"" << limit << "\" "
<< "Count=\"0\" "
<< "SOEData=\"" << (soeData ? "true" : "false") << "\"/>\n"
<< " </New>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
//根据监测点序号和终端序号来生成触发文件,后续可修改
std::string createdeleteXmlContent(int devindex, int mpindex)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <Delete>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"false\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"0\" "
<< "Count=\"0\" "
<< "SOEData=\"false\"/>\n"
<< " </Delete>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
// 写入 XML 内容到文件的函数
bool writeToFile(const std::string& filePath, const std::string& xmlContent)
{
// 打开文件流以写入 XML 内容
std::ofstream outFile(filePath.c_str()); // 使用 c_str() 转换为 const char*
if (outFile.is_open()) {
outFile << xmlContent; // 写入内容
outFile.close();
std::cout << "XML file created at: " << filePath << std::endl;
return true;
} else {
std::cerr << "Failed to open file for writing: " << filePath << std::endl;
return false;
}
}
// 创建并写入新的 XML 文件的主函数
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type)
{
std::string xmlContent = "";
std::string directory = "";
std::string filePath = "";
if(type == "new"){
// 构造 XML 内容
xmlContent = createnewXmlContent(devindex, mpindex, realData, soeData, limit);
// 设置文件路径
directory = FRONT_PATH + "/etc/trigger3s/";
filePath = directory + "newtrigger.xml";
}
else if(type == "delete"){
// 构造 XML 内容
xmlContent = createdeleteXmlContent(devindex, mpindex);
// 设置文件路径
directory = FRONT_PATH + "/etc/trigger3s/";
filePath = directory + "deletetrigger.xml";
}
else{
std::cerr << "Failed to create xmlfile,type error: " << std::endl;
return false;
}
// 创建目录(如果不存在)
if (system(("mkdir -p " + directory).c_str()) != 0) {
std::cerr << "Failed to create directory: " << directory << std::endl;
return false;
}
// 将 XML 内容写入文件
return writeToFile(filePath, xmlContent);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////生成台账更新文件
void add_indent(std::stringstream& stream, int level) {
for (int i = 0; i < level; ++i) {
stream << " "; // 每一级缩进 2 个空格
}
}
std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid)
{
std::cout << "prepare update" << std::endl;
std::stringstream xmlStream;
int indentLevel = 0; // 缩进级别
// 根节点
add_indent(xmlStream, indentLevel);
xmlStream << "<ledger_update>" << std::endl;
indentLevel++;
// 添加 guid 节点
add_indent(xmlStream, indentLevel);
xmlStream << "<guid>" << guid << "</guid>" << std::endl;
if (code_str == "ledger_modify" || code_str == "add_terminal") {
// 如果是 modify 类型
if (code_str == "ledger_modify") {
add_indent(xmlStream, indentLevel);
xmlStream << "<modify>" << std::endl;
indentLevel++;
}
else {
add_indent(xmlStream, indentLevel);
xmlStream << "<add>" << std::endl;
indentLevel++;
}
// 添加数据部分
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<ip>" << json_data.addr_str << "</ip>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<devType>" << json_data.dev_type << "</devType>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<maintName>" << json_data.maint_name << "</maintName>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<orgName>" << json_data.org_name << "</orgName>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<port>" << json_data.port << "</port>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalCode>" << json_data.terminal_name << "</terminalCode>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
//add_indent(xmlStream, indentLevel);
//xmlStream << "<manufacturer>" << json_data.tmnl_factory << "</manufacturer>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<status>" << json_data.tmnl_status << "</status>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<series>" << json_data.dev_series << "</series>" << std::endl;
//lnk20250210
add_indent(xmlStream, indentLevel);
xmlStream << "<processNo>" << json_data.processNo << "</processNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<Righttime>" << json_data.Righttime << "</Righttime>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<mac>" << json_data.mac << "</mac>" << std::endl;
// monitorData 部分
for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) {
const ledger_monitor& monitor = json_data.line[i];
add_indent(xmlStream, indentLevel);
xmlStream << "<monitorData" << (i + 1) << ">" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << monitor.monitor_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<name>" << monitor.monitor_name << "</name>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<lineNo>" << monitor.logical_device_seq << "</lineNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<voltageLevel>" << monitor.voltage_level << "</voltageLevel>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ptType>" << monitor.terminal_connect << "</ptType>" << std::endl;
//add_indent(xmlStream, indentLevel);
//xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminal_id>" << monitor.terminal_id << "</terminal_id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<CT1>" << monitor.CT1 << "</CT1>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<CT2>" << monitor.CT2 << "</CT2>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<PT1>" << monitor.PT1 << "</PT1>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<PT2>" << monitor.PT2 << "</PT2>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << monitor.status << "</status>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</monitorData>" << std::endl;
}
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
// 结束 modify 或 add 标签
if (code_str == "ledger_modify") {
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</modify>" << std::endl;
}
else {
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</add>" << std::endl;
}
} else if (code_str == "delete_terminal") {
// 如果是 delete 类型
add_indent(xmlStream, indentLevel);
xmlStream << "<delete>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</delete>" << std::endl;
}
else {
std::cerr << "code_str error" << std::endl;
return "";
}
// 结束根节点
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</ledger_update>" << std::endl;
return xmlStream.str(); // 返回构造的 XML 字符串
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////终端连接消息
void connect_status_to_queue(const std::string& id, const std::string& datetime, int status)//这个不使用,使用新的带有时间封装的
{
try {
// 构造 JSON
nlohmann::json jsonObject;
jsonObject["id"] = id;
jsonObject["date"] = datetime;
jsonObject["status"] = status;
// 构造发送结构
queue_data_t data;
data.strTopic = G_CONNECT_TOPIC;
data.strText = jsonObject.dump(); // 转换为字符串
data.tag = G_CONNECT_TAG;
data.key = G_CONNECT_KEY;
//if (g_node_id == STAT_DATA_BASE_NODE_ID) {
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
//}
}
catch (const std::exception& e) {
std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////响应消息
void send_reply_to_queue(const std::string& guid, const int code, const std::string& result) {
try {
// 构造 JSON 对象
nlohmann::json obj;
obj["guid"] = guid;
obj["code"] = code;
obj["result"] = result;
obj["processNo"] = g_front_seg_index;
obj["nodeId"] = FRONT_INST;
// 构造 queue 消息
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = obj.dump(); // 序列化为 JSON 字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
// 加入发送队列(线程安全)
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(connect_info);
}
catch (const std::exception& e) {
std::cerr << "send_reply_to_queue exception: " << e.what() << std::endl;
}
}
//台账更新批量回复
void send_batch_reply_to_queue(const std::string& guid,
const std::vector<DeviceReply>& replies) {
try {
nlohmann::json root;
root["guid"] = guid;
nlohmann::json arr = nlohmann::json::array();
for (const auto& r : replies) {
nlohmann::json item;
item["deviceId"] = r.deviceId;
item["code"] = r.code;
item["result"] = r.result;
arr.push_back(std::move(item));
}
root["data"] = std::move(arr);
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = root.dump();
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
} catch (const std::exception& e) {
std::cerr << "send_batch_reply_to_queue exception: " << e.what() << std::endl;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////心跳消息
void send_heartbeat_to_queue(const std::string& status) {
try{
nlohmann::json obj;
obj["nodeId"] = FRONT_INST;
obj["frontType"] = "cloudfront";
obj["processNo"] = g_front_seg_index;
obj["status"] = status;
queue_data_t connect_info;
connect_info.strTopic = Heart_Beat_Topic;
connect_info.strText = obj.dump(); // 紧凑格式 JSON
connect_info.tag = Heart_Beat_Tag;
connect_info.key = Heart_Beat_Key;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(connect_info);
}
catch (const std::exception& e) {
std::cerr << "send_heartbeat_to_queue exception: " << e.what() << std::endl;
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////稳态测试函数
bool shouldSkipTerminal(const std::string& terminal_id) {
for (size_t i = 0; i < TESTARRAY.size(); ++i) {
if (TESTARRAY[i] == terminal_id) {
return true;
}
}
return false;
}
void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) {
if (!INITFLAG) {
std::cout << "前置未初始化完成\n";
return;
}
if (!front || !front->m_producer) {
std::cerr << "front 或 producer 无效\n";
return;
}
rocketmq::RocketMQProducer* producer = front->m_producer;
queue_data_t data;
data.strTopic = G_ROCKETMQ_TOPIC_TEST;
data.mp_id = "0";
std::vector<std::string> filenames = {
"long_string.txt",
"PLT_string.txt",
"fluc_string.txt",
"qvvr_string.txt"
};
for (const auto& filename : filenames) {
std::ifstream file(filename);
if (!file.is_open()) {
std::cerr << "跳过无法打开的文件: " << filename << std::endl;
continue;
}
std::stringstream buffer;
buffer << file.rdbuf();
std::string base_strText = buffer.str();
std::time_t t = std::time(nullptr);
std::tm* time_info = std::localtime(&t);
time_info->tm_sec = 0;
std::time_t base_time_t = std::mktime(time_info);
long long current_time_ms = static_cast<long long>(base_time_t) * 1000;
int total_messages = mpnum;
if (type == 0) {
std::cout << "use ledger send msg" << std::endl;
//根据台账模式下每个进程都会发送
for (size_t i = 0; total_messages > 0 && i < terminal_devlist.size(); ++i) {
const auto& dev = terminal_devlist[i];
if (shouldSkipTerminal(dev.terminal_id)) {
std::cout << dev.terminal_id << " use true message" << std::endl;
continue;
}
for (size_t j = 0; j < dev.line.size(); ++j) {
if (dev.line[j].monitor_id.empty()) break;
data.mp_id = dev.line[j].monitor_id;
data.monitor_no = static_cast<int>(i + j);
std::string modified_time = std::to_string(current_time_ms / 1000);
std::string modified_strText = base_strText;
try {
auto j = nlohmann::json::parse(modified_strText);
j["Did"] = i;
if (j.contains("Msg") && j["Msg"].is_object()) {
j["Msg"]["Cldid"] = j;
if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) {
for (auto& item : j["Msg"]["DataArray"]) {
if (item.is_object()) {
item["DataTimeSec"] = std::stoll(modified_time);
}
}
}
}
modified_strText = j.dump();
} catch (...) {
// 保持原始文本
}
data.strText = modified_strText;
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
std::cout << "Sent message " << (i + 1)
<< " with Monitor " << data.monitor_no
<< " and TIME " << modified_time << std::endl;
}
}
} else {
std::cout << "use monitor + number send msg" << std::endl;
//根据虚构监测点模式下只有进程1发送
for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 ) && i < total_messages; ++i) {
std::string monitor_id = "testmonitor" + std::to_string(i);
data.mp_id = monitor_id;
data.monitor_no = i;
std::string modified_time = std::to_string(current_time_ms / 1000);
std::string modified_strText = base_strText;
try {
auto j = nlohmann::json::parse(modified_strText);
j["Did"] = 0;
if (j.contains("Msg") && j["Msg"].is_object()) {
j["Msg"]["Cldid"] = data.mp_id;
if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) {
for (auto& item : j["Msg"]["DataArray"]) {
if (item.is_object()) {
item["DataTimeSec"] = std::stoll(modified_time);
}
}
}
}
modified_strText = j.dump();
} catch (...) {
// 保持原始文本
}
data.strText = modified_strText;
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
std::cout << "Sent message " << (i + 1)
<< " with Monitor " << data.monitor_no
<< " and TIME " << modified_time << std::endl;
}
}
std::cout << "Finished sending " << total_messages << " messages." << std::endl;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////其他测试函数
void rocketmq_test_rt(Front* front)//用来测试实时数据
{
if (!front || !front->m_producer) {
std::cerr << "front 或 producer 无效\n";
return;
}
rocketmq::RocketMQProducer* producer = front->m_producer;
queue_data_t data;
data.monitor_no = 123;
data.strTopic = G_MQCONSUMER_TOPIC_RT;
std::ifstream file("rt.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = std::string(buffer.str());
data.mp_id = "123123";
data.tag = FRONT_INST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
void rocketmq_test_ud(Front* front)//用来测试台账更新
{
if (!front || !front->m_producer) {
std::cerr << "front 或 producer 无效\n";
return;
}
rocketmq::RocketMQProducer* producer = front->m_producer;
queue_data_t data;
data.monitor_no = 123;
data.strTopic = G_MQCONSUMER_TOPIC_UD;
std::ifstream file("ud.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = std::string(buffer.str());
data.mp_id = "123123";
data.tag = FRONT_INST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
void rocketmq_test_set(Front* front)//用来测试进程控制脚本
{
if (!front || !front->m_producer) {
std::cerr << "front 或 producer 无效\n";
return;
}
rocketmq::RocketMQProducer* producer = front->m_producer;
queue_data_t data;
data.monitor_no = 123;
data.strTopic = G_MQCONSUMER_TOPIC_SET;
std::ifstream file("set.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = std::string(buffer.str());
data.mp_id = "123123";
data.tag = FRONT_INST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
void rocketmq_test_rc(Front* front)//用来测试补招
{
if (!front || !front->m_producer) {
std::cerr << "front 或 producer 无效\n";
return;
}
rocketmq::RocketMQProducer* producer = front->m_producer;
queue_data_t data;
data.monitor_no = 123;
data.strTopic = G_MQCONSUMER_TOPIC_RC;
std::ifstream file("rc.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = std::string(buffer.str());
data.mp_id = "123123";
data.tag = FRONT_INST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////云前置新增功能
bool parseJsonMessageCLOUD(const std::string &body,
std::string &devid,
std::string &guid,
nlohmann::json &detailObj, // 这里返回整个 Detail
std::string &front_ip, // 新增:返回 FrontIP
int &node) // 新增:返回 Node
{
try {
auto j = nlohmann::json::parse(body);
// guid
if (j.contains("guid") && j["guid"].is_string()) {
guid = j["guid"].get<std::string>();
} else {
guid.clear();
}
// FrontIP
if (j.contains("FrontIP") && j["FrontIP"].is_string()) {
front_ip = j["FrontIP"].get<std::string>();
} else {
front_ip.clear();
}
// Node
if (j.contains("Node") && j["Node"].is_number_integer()) {
node = j["Node"].get<int>();
} else {
node = 0;
}
// Dev_id兼容字符串或数字
if (j.contains("Dev_id")) {
if (j["Dev_id"].is_string()) {
devid = j["Dev_id"].get<std::string>();
} else if (j["Dev_id"].is_number_integer()) {
devid = std::to_string(j["Dev_id"].get<long long>());
} else if (j["Dev_id"].is_number_unsigned()) {
devid = std::to_string(j["Dev_id"].get<unsigned long long>());
} else if (j["Dev_id"].is_number_float()) {
devid = std::to_string(j["Dev_id"].get<double>());
} else {
devid.clear();
}
} else {
devid.clear();
}
// Detail完整放入 detailObj
if (j.contains("Detail") && j["Detail"].is_object()) {
detailObj = j["Detail"]; // 直接保存整个 Detail
} else {
detailObj = nlohmann::json::object();
}
return true;
}
catch (const std::exception &e) {
std::cerr << "[parseJsonMessageCLOUD] JSON parse error: " << e.what() << "\n";
guid.clear();
devid.clear();
front_ip.clear();
node = 0;
detailObj = nlohmann::json::object();
return false;
}
}
int recordguid(const std::string &devid,
const std::string &guid,
int busytype,int busycount)
{
std::lock_guard<std::mutex> lock(ledgermtx);
for (auto &dev : terminal_devlist) {
if (dev.terminal_id == devid) {
if (dev.isbusy == 1) {
std::cout << "Dev is busybusytype is" << dev.busytype << std::endl;
//响应guid:正忙
return dev.busytype; // 正在忙,不能记录
}
dev.guid = guid;
dev.busytype = busytype;
dev.isbusy = busycount;
dev.busytimecount = 0;
return 0;
}
}
std::cout << "Dev not found" << std::endl;
//响应guid:失败
return -1; // 未找到对应的装置
}
// 按 type 解析 Msg
bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann::json& detailObj) {
MsgParsed parsed;
nlohmann::json msgObj;
// 直接解析 detailObj 的 Type
if (detailObj.contains("Type")) {
if (detailObj["Type"].is_string()) {
try {
parsed.type = std::stoi(detailObj["Type"].get<std::string>(), nullptr, 0); // 支持 "0x2106" 格式
} catch (...) {
return false;
}
} else if (detailObj["Type"].is_number_integer()) {
parsed.type = detailObj["Type"].get<int>();
} else if (detailObj["Type"].is_number_unsigned()) {
parsed.type = static_cast<int>(detailObj["Type"].get<unsigned int>());
} else {
return false;
}
} else {
return false;
}
// 直接解析 detailObj 的 Msg
if (detailObj.contains("Msg") && detailObj["Msg"].is_object()) {
msgObj = detailObj["Msg"];
} else {
msgObj = nlohmann::json::object();
}
try {
switch (parsed.type) {
case 0x2131: { // 读取目录
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_FILEMENU),1)){
return true;
}
if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false;
parsed.name = msgObj["Name"].get<std::string>();
parsed.ok = true;
std::cout << "[dir parsemsg] Name: " << parsed.name << std::endl;
// 添加指令到队列当中
ClientManager::instance().add_file_menu_action_to_device(devid, parsed.name);
return true;
}
case 0x2132: { // 下载文件
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_FILEDATA),1)){
return true;
}
if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false;
parsed.name = msgObj["Name"].get<std::string>();
parsed.ok = true;
std::cout << "[file parsemsg] Name: " << parsed.name << std::endl;
// 下发指令
ClientManager::instance().add_file_download_action_to_device(devid, parsed.name);
return true;
}
case 0x2106: { // 定值/内部定值
if (!msgObj.contains("Cldid") || !msgObj["Cldid"].is_number_integer()) return false;
if (!msgObj.contains("DataType") || !msgObj["DataType"].is_number_integer()) return false;
if (!msgObj.contains("Operate") || !msgObj["Operate"].is_number_integer()) return false;
if (!msgObj.contains("DataArray")|| !msgObj["DataArray"].is_array()) return false;
parsed.cldid = msgObj["Cldid"].get<int>();
parsed.datatype = msgObj["DataType"].get<int>();
parsed.operate = msgObj["Operate"].get<int>();
// 调试打印
std::cout << "[parsemsg] Cldid=" << parsed.cldid
<< ", DataType=0x" << std::hex << parsed.datatype << std::dec
<< ", Operate=" << parsed.operate
<< std::endl;
// 先清空数组,避免复用对象时残留
parsed.dataArray_f.clear();
parsed.dataArray_us.clear();
switch (parsed.datatype) {
case 0x0C: { // 定值float 阵列)
for (const auto& v : msgObj["DataArray"]) {
if (!v.is_number()) return false;
// 统一按 double 取,再强转成 float 更稳妥
parsed.dataArray_f.push_back(static_cast<float>(v.get<double>()));
}
// 打印 DataArray
std::cout << "[0x0C] DataArray=[";
for (size_t i = 0; i < parsed.dataArray_f.size(); ++i) {
std::cout << parsed.dataArray_f[i] << (i + 1 < parsed.dataArray_f.size() ? ", " : "");
}
std::cout << "]" << std::endl;
parsed.ok = true;
// 根据 Operate 分流1=读2=写)
switch (parsed.operate) {
case 1: { // 读
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_FIXEDVALUE),2)){
return true;
}
ClientManager::instance().get_fixedvalue_action_to_device(
devid, static_cast<uint16_t>(parsed.cldid)); // 获取装置测点定值数据
ClientManager::instance().get_fixedvaluedes_action_to_device(devid); // 获取装置定值描述 只有这一步可以响应成功和关闭
break;
}
case 2: { // 写
if(!recordguid(devid,guid,static_cast<int>(DeviceState::SET_FIXEDVALUE),1)){
return true;
}
ClientManager::instance().set_fixedvalue_action_to_device(
devid, static_cast<uint16_t>(parsed.cldid), parsed.dataArray_f); // 装置修改定值
break;
}
default:
return false;
}
break;
}
case 0x0D: { // 内部定值uint16_t 阵列)
for (const auto& v : msgObj["DataArray"]) {
if (!v.is_number_integer() && !v.is_number_unsigned()) return false;
// 范围校验 [0, 65535]
long long val = v.get<long long>();
if (val < 0 || val > 65535) return false;
parsed.dataArray_us.push_back(static_cast<uint16_t>(val));
}
// 打印 DataArray
std::cout << "[0x0D] DataArray=[";
for (size_t i = 0; i < parsed.dataArray_us.size(); ++i) {
std::cout << parsed.dataArray_us[i] << (i + 1 < parsed.dataArray_us.size() ? ", " : "");
}
std::cout << "]" << std::endl;
parsed.ok = true;
// 根据 Operate 分流1=读2=写)
switch (parsed.operate) {
case 1: { // 读
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_INTERFIXEDVALUE),3)){
return true;
}
ClientManager::instance().get_interfixedvalue_action_to_device(devid); // 获取内部定值
ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 1); // 1-内部定值描述
ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 2); // 2-控制字描述 只有这一步可以响应成功和关闭
break;
}
case 2: { // 写
if(!recordguid(devid,guid,static_cast<int>(DeviceState::SET_INTERFIXEDVALUE),1)){
return true;
}
ClientManager::instance().set_interfixedvalue_action_to_device(devid, parsed.dataArray_us);
break;
}
default:
return false;
}
break;
}
default:
return false;
}
return true;
}
default:
return false;
}
} catch (const std::exception& e) {
std::cerr << "[parsemsg] exception: " << e.what() << std::endl;
return false;
} catch (...) {
std::cerr << "[parsemsg] unknown exception" << std::endl;
return false;
}
}
//心跳和其他响应
void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type, const std::string& guid, const std::string& mac) {
try {
/*std::string guid = find_guid_index_from_dev_id(dev_id);*/
if(guid == "")
{
std::cerr << "dev: " << dev_id << " guid not found" << std::endl;
return;
}
// ---- 构造根 JSON ----
nlohmann::json obj;
obj["guid"] = guid;
obj["FrontId"] = FRONT_INST;
obj["Node"] = g_front_seg_index;
// Dev_mac从台账取 addr_str 并规范化
//std::string mac = get_mac_by_devid(dev_id);
obj["Dev_mac"] = normalize_mac(mac);
// ---- 构造 Detail ----
nlohmann::json detail;
detail["Type"] = type;
// Msg
nlohmann::json msg;
msg["Time"] = static_cast<long long>(std::time(nullptr));
detail["Msg"] = std::move(msg);
// Code
detail["Code"] = reply_code;
obj["Detail"] = std::move(detail);
// ---- 入队发送 ----
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = obj.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
}
// 调试打印
std::cout << "[send_reply_to_cloud] queued: " << obj.dump() << std::endl;
}
catch (const std::exception& e) {
std::cerr << "send_reply_to_cloud exception: " << e.what() << std::endl;
}
}
//云前置功能
rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg) {
//未初始化不处理消费
if (INITFLAG != 1) {
return rocketmq::RECONSUME_LATER;
}
std::string body = msg.getBody();
std::string key = msg.getKeys();
if (body.empty()) {
std::cerr << "Message body is NULL or empty." << std::endl;
return rocketmq::RECONSUME_LATER;
}
// 日志记录
DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的云前置控制消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_CLOUD.c_str());
std::cout << "cloud Callback received message: " << body << std::endl;
if (!key.empty()) {
std::cout << "Message Key: " << key << std::endl;
} else {
std::cout << "Message Key: N/A" << std::endl;
}
// 消息解析
std::string guid;
std::string devid;
std::string FrontId;
int Node;
nlohmann::json DetailObj;
if (!parseJsonMessageCLOUD(body, devid, guid, DetailObj,FrontId,Node)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的云前置控制消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
return rocketmq::RECONSUME_LATER;
}
// ====== 调试打印 ======
std::cout << "[CLOUD Msg Parsed] "
<< "guid=" << guid
<< ", devid=" << devid
<< ", FrontId=" << FrontId
<< ", Node=" << Node
<< std::endl;
if(FrontId != FRONT_INST || Node != g_front_seg_index){
std::cout << "当前进程不消费这个消息" << std::endl;
return rocketmq::CONSUME_SUCCESS;
}
if(!parsemsg(devid,guid,DetailObj)){
std::cerr << "clouddata is error." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的云前置控制消息失败,消息无法解析", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
}
return rocketmq::CONSUME_SUCCESS;
}
void rocketmq_test_getdir(Front* front)//用来测试目录获取
{
if (!front || !front->m_producer) {
std::cerr << "front 或 producer 无效\n";
return;
}
rocketmq::RocketMQProducer* producer = front->m_producer;
queue_data_t data;
data.monitor_no = 123;
data.strTopic = G_MQCONSUMER_TOPIC_CLOUD;
std::ifstream file("getdir.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = std::string(buffer.str());
data.mp_id = "123123";
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
//状态翻转
void connect_status_update(const std::string& id, int status)
{
// 获取当前系统时间(格式: YYYY-MM-DD HH:MM:SS
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::tm tm_buf;
localtime_r(&now_c, &tm_buf);
std::ostringstream datetime_ss;
datetime_ss << std::put_time(&tm_buf, "%Y-%m-%d %H:%M:%S");
std::string datetime_str = datetime_ss.str();
// 构造 JSON 对象
nlohmann::json j;
j["id"] = id;
j["date"] = datetime_str;
j["status"] = std::to_string(status);
// 构造队列消息
queue_data_t connect_info;
connect_info.strTopic = G_CONNECT_TOPIC;
connect_info.strText = j.dump(); // 转成字符串
connect_info.tag = G_CONNECT_TAG;
connect_info.key = G_CONNECT_KEY;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
}
// 调试打印
std::cout << "[connect_status_update] queued JSON:\n" << j.dump(4) << std::endl;
}